Skip to main content

reth_exex/wal/
storage.rs

1use std::{
2    fs::File,
3    ops::RangeInclusive,
4    path::{Path, PathBuf},
5};
6
7use crate::wal::{WalError, WalResult};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_exex_types::ExExNotification;
10use reth_node_api::NodePrimitives;
11use reth_tracing::tracing::debug;
12use tracing::instrument;
13
14static FILE_EXTENSION: &str = "wal";
15
16/// The underlying WAL storage backed by a directory of files.
17///
18/// Each notification is represented by a single file that contains a MessagePack-encoded
19/// notification.
20#[derive(Debug, Clone)]
21pub struct Storage<N: NodePrimitives = EthPrimitives> {
22    /// The path to the WAL file.
23    path: PathBuf,
24    _pd: std::marker::PhantomData<N>,
25}
26
27impl<N> Storage<N>
28where
29    N: NodePrimitives,
30{
31    /// Creates a new instance of [`Storage`] backed by the file at the given path and creates
32    /// it doesn't exist.
33    pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
34        reth_fs_util::create_dir_all(&path)?;
35
36        Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
37    }
38
39    fn file_path(&self, id: u32) -> PathBuf {
40        self.path.join(format!("{id}.{FILE_EXTENSION}"))
41    }
42
43    fn parse_filename(filename: &str) -> WalResult<u32> {
44        filename
45            .strip_suffix(".wal")
46            .and_then(|s| s.parse().ok())
47            .ok_or_else(|| WalError::Parse(filename.to_string()))
48    }
49
50    /// Removes notification for the given file ID from the storage.
51    ///
52    /// # Returns
53    ///
54    /// The size of the file that was removed in bytes, if any.
55    #[instrument(skip(self))]
56    fn remove_notification(&self, file_id: u32) -> Option<u64> {
57        let path = self.file_path(file_id);
58        let size = path.metadata().ok()?.len();
59
60        match reth_fs_util::remove_file(self.file_path(file_id)) {
61            Ok(()) => {
62                debug!(target: "exex::wal::storage", "Notification was removed from the storage");
63                Some(size)
64            }
65            Err(err) => {
66                debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
67                None
68            }
69        }
70    }
71
72    /// Returns the range of file IDs in the storage.
73    ///
74    /// If there are no files in the storage, returns `None`.
75    pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
76        let mut min_id = None;
77        let mut max_id = None;
78
79        for entry in reth_fs_util::read_dir(&self.path)? {
80            let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;
81
82            if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
83                let file_name = entry.file_name();
84                let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
85
86                min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id)));
87                max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id)));
88            }
89        }
90
91        Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
92    }
93
94    /// Removes notifications from the storage according to the given list of file IDs.
95    ///
96    /// # Returns
97    ///
98    /// Number of removed notifications and the total size of the removed files in bytes.
99    pub(super) fn remove_notifications(
100        &self,
101        file_ids: impl IntoIterator<Item = u32>,
102    ) -> WalResult<(usize, u64)> {
103        let mut deleted_total = 0;
104        let mut deleted_size = 0;
105
106        for id in file_ids {
107            if let Some(size) = self.remove_notification(id) {
108                deleted_total += 1;
109                deleted_size += size;
110            }
111        }
112
113        Ok((deleted_total, deleted_size))
114    }
115
116    pub(super) fn iter_notifications(
117        &self,
118        range: RangeInclusive<u32>,
119    ) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification<N>)>> + '_ {
120        range.map(move |id| {
121            let (notification, size) =
122                self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;
123
124            Ok((id, size, notification))
125        })
126    }
127
128    /// Reads the notification from the file with the given ID.
129    #[instrument(skip(self))]
130    pub(super) fn read_notification(
131        &self,
132        file_id: u32,
133    ) -> WalResult<Option<(ExExNotification<N>, u64)>> {
134        let file_path = self.file_path(file_id);
135        debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
136
137        let mut file = match File::open(&file_path) {
138            Ok(file) => file,
139            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
140            Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
141        };
142        let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();
143
144        // Deserialize using the bincode- and msgpack-compatible serde wrapper
145        let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
146            rmp_serde::decode::from_read(&mut file)
147                .map_err(|err| WalError::Decode(file_id, file_path, err))?;
148
149        Ok(Some((notification.into(), size)))
150    }
151
152    /// Writes the notification to the file with the given ID.
153    ///
154    /// # Returns
155    ///
156    /// The size of the file that was written in bytes.
157    #[instrument(skip(self, notification))]
158    pub(super) fn write_notification(
159        &self,
160        file_id: u32,
161        notification: &ExExNotification<N>,
162    ) -> WalResult<u64> {
163        let file_path = self.file_path(file_id);
164        debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
165
166        // Serialize using the bincode- and msgpack-compatible serde wrapper
167        let notification =
168            reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
169
170        reth_fs_util::atomic_write_file(&file_path, |file| {
171            rmp_serde::encode::write(file, &notification)
172        })?;
173
174        Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::Storage;
181    use alloy_consensus::BlockHeader;
182    use alloy_primitives::{
183        map::{HashMap, HashSet},
184        B256, U256,
185    };
186    use reth_exex_types::ExExNotification;
187    use reth_primitives_traits::Account;
188    use reth_provider::Chain;
189    use reth_testing_utils::generators::{self, random_block};
190    use reth_trie_common::{
191        updates::{StorageTrieUpdates, TrieUpdates},
192        BranchNodeCompact, HashedPostState, HashedStorage, LazyTrieData, Nibbles,
193    };
194    use std::{collections::BTreeMap, fs::File, sync::Arc};
195
196    // wal with 1 block and tx (old 3-field format)
197    // <https://github.com/paradigmxyz/reth/issues/15012>
198    #[test]
199    fn decode_notification_wal() {
200        let wal = include_bytes!("../../test-data/28.wal");
201        let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
202            '_,
203            reth_ethereum_primitives::EthPrimitives,
204        > = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
205        let notification: ExExNotification = notification.into();
206        match notification {
207            ExExNotification::ChainCommitted { new } => {
208                assert_eq!(new.blocks().len(), 1);
209                assert_eq!(new.tip().transaction_count(), 1);
210            }
211            _ => panic!("unexpected notification"),
212        }
213    }
214
215    // wal with 1 block and tx (new 4-field format with trie updates and hashed state)
216    #[test]
217    fn decode_notification_wal_new_format() {
218        let wal = include_bytes!("../../test-data/new_format.wal");
219        let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
220            '_,
221            reth_ethereum_primitives::EthPrimitives,
222        > = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
223        let notification: ExExNotification = notification.into();
224
225        // Get expected data
226        let expected_notification = get_test_notification_data().unwrap();
227        assert_eq!(
228            &notification, &expected_notification,
229            "Decoded notification should match expected static data"
230        );
231    }
232
233    #[test]
234    fn test_roundtrip() -> eyre::Result<()> {
235        let mut rng = generators::rng();
236
237        let temp_dir = tempfile::tempdir()?;
238        let storage: Storage = Storage::new(&temp_dir)?;
239
240        let old_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
241        let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
242
243        let notification = ExExNotification::ChainReorged {
244            new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
245            old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
246        };
247
248        // Do a round trip serialization and deserialization
249        let file_id = 0;
250        storage.write_notification(file_id, &notification)?;
251        let deserialized_notification = storage.read_notification(file_id)?;
252        assert_eq!(
253            deserialized_notification.map(|(notification, _)| notification),
254            Some(notification)
255        );
256
257        Ok(())
258    }
259
260    /// Generate a new WAL file for testing.
261    ///
262    /// Run this test with `--ignored` to generate a new test WAL file:
263    /// ```sh
264    /// cargo test -p reth-exex generate_test_wal -- --ignored --nocapture
265    /// ```
266    #[test]
267    #[ignore]
268    fn generate_test_wal() -> eyre::Result<()> {
269        use std::io::Write;
270
271        let notification = get_test_notification_data()?;
272
273        // Serialize the notification
274        let notification_compat =
275            reth_exex_types::serde_bincode_compat::ExExNotification::from(&notification);
276        let encoded = rmp_serde::encode::to_vec(&notification_compat)?;
277
278        // Write to test-data directory
279        let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
280        std::fs::create_dir_all(&test_data_dir)?;
281
282        let output_path = test_data_dir.join("new_format.wal");
283        let mut file = File::create(&output_path)?;
284        file.write_all(&encoded)?;
285
286        println!("Generated WAL file at: {}", output_path.display());
287        println!("File size: {} bytes", encoded.len());
288        println!("✓ WAL file created successfully!");
289
290        Ok(())
291    }
292
293    /// Helper function to generate deterministic test data for WAL tests
294    fn get_test_notification_data(
295    ) -> eyre::Result<ExExNotification<reth_ethereum_primitives::EthPrimitives>> {
296        use reth_ethereum_primitives::Block;
297        use reth_primitives_traits::Block as _;
298
299        // Create a block with a transaction
300        let block = Block::default().seal_slow().try_recover()?;
301        let block_number = block.header().number();
302
303        let hashed_address = B256::from([1; 32]);
304        let storage_key = B256::from([2; 32]);
305
306        let trie_updates = TrieUpdates {
307            account_nodes: HashMap::from_iter([
308                (Nibbles::from_nibbles_unchecked([0x01]), BranchNodeCompact::default()),
309                (Nibbles::from_nibbles_unchecked([0x02]), BranchNodeCompact::default()),
310            ]),
311            removed_nodes: HashSet::from_iter([Nibbles::from_nibbles_unchecked([0x03])]),
312            storage_tries: HashMap::from_iter([(
313                hashed_address,
314                StorageTrieUpdates {
315                    is_deleted: false,
316                    storage_nodes: HashMap::from_iter([(
317                        Nibbles::from_nibbles_unchecked([0x04]),
318                        BranchNodeCompact::default(),
319                    )]),
320                    removed_nodes: Default::default(),
321                },
322            )]),
323        };
324
325        let hashed_state = HashedPostState {
326            accounts: HashMap::from_iter([(
327                hashed_address,
328                Some(Account { nonce: 1, ..Default::default() }),
329            )]),
330            storages: HashMap::from_iter([(
331                hashed_address,
332                HashedStorage {
333                    wiped: false,
334                    storage: HashMap::from_iter([(storage_key, U256::from(101))]),
335                },
336            )]),
337        };
338
339        let trie_data = LazyTrieData::ready(
340            Arc::new(hashed_state.into_sorted()),
341            Arc::new(trie_updates.into_sorted()),
342        );
343
344        let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
345            ExExNotification::ChainCommitted {
346                new: Arc::new(Chain::new(
347                    vec![block],
348                    Default::default(),
349                    BTreeMap::from([(block_number, trie_data)]),
350                )),
351            };
352        Ok(notification)
353    }
354
355    #[test]
356    fn test_files_range() -> eyre::Result<()> {
357        let temp_dir = tempfile::tempdir()?;
358        let storage: Storage = Storage::new(&temp_dir)?;
359
360        // Create WAL files
361        File::create(storage.file_path(1))?;
362        File::create(storage.file_path(2))?;
363        File::create(storage.file_path(3))?;
364
365        // Create non-WAL files that should be ignored
366        File::create(temp_dir.path().join("0.tmp"))?;
367        File::create(temp_dir.path().join("4.tmp"))?;
368
369        // Check files range
370        assert_eq!(storage.files_range()?, Some(1..=3));
371
372        Ok(())
373    }
374}