1use std::{
2 fs::File,
3 ops::RangeInclusive,
4 path::{Path, PathBuf},
5};
6
7use crate::wal::{WalError, WalResult};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_exex_types::ExExNotification;
10use reth_node_api::NodePrimitives;
11use reth_tracing::tracing::debug;
12use tracing::instrument;
13
14static FILE_EXTENSION: &str = "wal";
15
16#[derive(Debug, Clone)]
21pub struct Storage<N: NodePrimitives = EthPrimitives> {
22 path: PathBuf,
24 _pd: std::marker::PhantomData<N>,
25}
26
27impl<N> Storage<N>
28where
29 N: NodePrimitives,
30{
31 pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
34 reth_fs_util::create_dir_all(&path)?;
35
36 Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
37 }
38
39 fn file_path(&self, id: u32) -> PathBuf {
40 self.path.join(format!("{id}.{FILE_EXTENSION}"))
41 }
42
43 fn parse_filename(filename: &str) -> WalResult<u32> {
44 filename
45 .strip_suffix(".wal")
46 .and_then(|s| s.parse().ok())
47 .ok_or_else(|| WalError::Parse(filename.to_string()))
48 }
49
50 #[instrument(skip(self))]
56 fn remove_notification(&self, file_id: u32) -> Option<u64> {
57 let path = self.file_path(file_id);
58 let size = path.metadata().ok()?.len();
59
60 match reth_fs_util::remove_file(self.file_path(file_id)) {
61 Ok(()) => {
62 debug!(target: "exex::wal::storage", "Notification was removed from the storage");
63 Some(size)
64 }
65 Err(err) => {
66 debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
67 None
68 }
69 }
70 }
71
72 pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
76 let mut min_id = None;
77 let mut max_id = None;
78
79 for entry in reth_fs_util::read_dir(&self.path)? {
80 let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;
81
82 if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
83 let file_name = entry.file_name();
84 let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
85
86 min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id)));
87 max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id)));
88 }
89 }
90
91 Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
92 }
93
94 pub(super) fn remove_notifications(
100 &self,
101 file_ids: impl IntoIterator<Item = u32>,
102 ) -> WalResult<(usize, u64)> {
103 let mut deleted_total = 0;
104 let mut deleted_size = 0;
105
106 for id in file_ids {
107 if let Some(size) = self.remove_notification(id) {
108 deleted_total += 1;
109 deleted_size += size;
110 }
111 }
112
113 Ok((deleted_total, deleted_size))
114 }
115
116 pub(super) fn iter_notifications(
117 &self,
118 range: RangeInclusive<u32>,
119 ) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification<N>)>> + '_ {
120 range.map(move |id| {
121 let (notification, size) =
122 self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;
123
124 Ok((id, size, notification))
125 })
126 }
127
128 #[instrument(skip(self))]
130 pub(super) fn read_notification(
131 &self,
132 file_id: u32,
133 ) -> WalResult<Option<(ExExNotification<N>, u64)>> {
134 let file_path = self.file_path(file_id);
135 debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
136
137 let mut file = match File::open(&file_path) {
138 Ok(file) => file,
139 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
140 Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
141 };
142 let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();
143
144 let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
146 rmp_serde::decode::from_read(&mut file)
147 .map_err(|err| WalError::Decode(file_id, file_path, err))?;
148
149 Ok(Some((notification.into(), size)))
150 }
151
152 #[instrument(skip(self, notification))]
158 pub(super) fn write_notification(
159 &self,
160 file_id: u32,
161 notification: &ExExNotification<N>,
162 ) -> WalResult<u64> {
163 let file_path = self.file_path(file_id);
164 debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
165
166 let notification =
168 reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
169
170 reth_fs_util::atomic_write_file(&file_path, |file| {
171 rmp_serde::encode::write(file, ¬ification)
172 })?;
173
174 Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::Storage;
181 use alloy_consensus::BlockHeader;
182 use alloy_primitives::{
183 map::{HashMap, HashSet},
184 B256, U256,
185 };
186 use reth_exex_types::ExExNotification;
187 use reth_primitives_traits::Account;
188 use reth_provider::Chain;
189 use reth_testing_utils::generators::{self, random_block};
190 use reth_trie_common::{
191 updates::{StorageTrieUpdates, TrieUpdates},
192 BranchNodeCompact, HashedPostState, HashedStorage, LazyTrieData, Nibbles,
193 };
194 use std::{collections::BTreeMap, fs::File, sync::Arc};
195
196 #[test]
197 fn test_roundtrip() -> eyre::Result<()> {
198 let mut rng = generators::rng();
199
200 let temp_dir = tempfile::tempdir()?;
201 let storage: Storage = Storage::new(&temp_dir)?;
202
203 let old_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
204 let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
205
206 let notification = ExExNotification::ChainReorged {
207 new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
208 old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
209 };
210
211 let file_id = 0;
213 storage.write_notification(file_id, ¬ification)?;
214 let deserialized_notification = storage.read_notification(file_id)?;
215 assert_eq!(
216 deserialized_notification.map(|(notification, _)| notification),
217 Some(notification)
218 );
219
220 Ok(())
221 }
222
223 #[test]
230 #[ignore]
231 fn generate_test_wal() -> eyre::Result<()> {
232 use std::io::Write;
233
234 let notification = get_test_notification_data()?;
235
236 let notification_compat =
238 reth_exex_types::serde_bincode_compat::ExExNotification::from(¬ification);
239 let encoded = rmp_serde::encode::to_vec(¬ification_compat)?;
240
241 let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
243 std::fs::create_dir_all(&test_data_dir)?;
244
245 let output_path = test_data_dir.join("new_format.wal");
246 let mut file = File::create(&output_path)?;
247 file.write_all(&encoded)?;
248
249 println!("Generated WAL file at: {}", output_path.display());
250 println!("File size: {} bytes", encoded.len());
251 println!("✓ WAL file created successfully!");
252
253 Ok(())
254 }
255
256 fn get_test_notification_data(
258 ) -> eyre::Result<ExExNotification<reth_ethereum_primitives::EthPrimitives>> {
259 use reth_ethereum_primitives::Block;
260 use reth_primitives_traits::Block as _;
261
262 let block = Block::default().seal_slow().try_recover()?;
264 let block_number = block.header().number();
265
266 let hashed_address = B256::from([1; 32]);
267 let storage_key = B256::from([2; 32]);
268
269 let trie_updates = TrieUpdates {
270 account_nodes: HashMap::from_iter([
271 (Nibbles::from_nibbles_unchecked([0x01]), BranchNodeCompact::default()),
272 (Nibbles::from_nibbles_unchecked([0x02]), BranchNodeCompact::default()),
273 ]),
274 removed_nodes: HashSet::from_iter([Nibbles::from_nibbles_unchecked([0x03])]),
275 storage_tries: HashMap::from_iter([(
276 hashed_address,
277 StorageTrieUpdates {
278 is_deleted: false,
279 storage_nodes: HashMap::from_iter([(
280 Nibbles::from_nibbles_unchecked([0x04]),
281 BranchNodeCompact::default(),
282 )]),
283 removed_nodes: Default::default(),
284 },
285 )]),
286 };
287
288 let hashed_state = HashedPostState {
289 accounts: HashMap::from_iter([(
290 hashed_address,
291 Some(Account { nonce: 1, ..Default::default() }),
292 )]),
293 storages: HashMap::from_iter([(
294 hashed_address,
295 HashedStorage {
296 wiped: false,
297 storage: HashMap::from_iter([(storage_key, U256::from(101))]),
298 },
299 )]),
300 };
301
302 let trie_data = LazyTrieData::ready(
303 Arc::new(hashed_state.into_sorted()),
304 Arc::new(trie_updates.into_sorted()),
305 );
306
307 let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
308 ExExNotification::ChainCommitted {
309 new: Arc::new(Chain::new(
310 vec![block],
311 Default::default(),
312 BTreeMap::from([(block_number, trie_data)]),
313 )),
314 };
315 Ok(notification)
316 }
317
318 #[test]
319 fn test_files_range() -> eyre::Result<()> {
320 let temp_dir = tempfile::tempdir()?;
321 let storage: Storage = Storage::new(&temp_dir)?;
322
323 File::create(storage.file_path(1))?;
325 File::create(storage.file_path(2))?;
326 File::create(storage.file_path(3))?;
327
328 File::create(temp_dir.path().join("0.tmp"))?;
330 File::create(temp_dir.path().join("4.tmp"))?;
331
332 assert_eq!(storage.files_range()?, Some(1..=3));
334
335 Ok(())
336 }
337}