1#![allow(dead_code)]
2
3mod cache;
4pub use cache::BlockCache;
5mod storage;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_node_api::NodePrimitives;
8pub use storage::Storage;
9mod metrics;
10use metrics::Metrics;
11mod error;
12pub use error::{WalError, WalResult};
13
14use std::{
15 path::Path,
16 sync::{
17 atomic::{AtomicU32, Ordering},
18 Arc,
19 },
20};
21
22use alloy_eips::BlockNumHash;
23use alloy_primitives::B256;
24use parking_lot::{RwLock, RwLockReadGuard};
25use reth_exex_types::ExExNotification;
26use reth_tracing::tracing::{debug, instrument};
27
28#[derive(Debug, Clone)]
39pub struct Wal<N: NodePrimitives = EthPrimitives> {
40 inner: Arc<WalInner<N>>,
41}
42
43impl<N> Wal<N>
44where
45 N: NodePrimitives,
46{
47 pub fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
49 Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
50 }
51
52 pub fn handle(&self) -> WalHandle<N> {
54 WalHandle { wal: self.inner.clone() }
55 }
56
57 pub fn commit(&self, notification: &ExExNotification<N>) -> WalResult<()> {
59 self.inner.commit(notification)
60 }
61
62 pub fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
67 self.inner.finalize(to_block)
68 }
69
70 pub fn iter_notifications(
72 &self,
73 ) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification<N>>> + '_>> {
74 self.inner.iter_notifications()
75 }
76
77 pub fn num_blocks(&self) -> usize {
79 self.inner.block_cache().num_blocks()
80 }
81}
82
83#[derive(Debug)]
85struct WalInner<N: NodePrimitives> {
86 next_file_id: AtomicU32,
87 storage: Storage<N>,
89 block_cache: RwLock<BlockCache>,
91 metrics: Metrics,
92}
93
94impl<N> WalInner<N>
95where
96 N: NodePrimitives,
97{
98 fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
99 let mut wal = Self {
100 next_file_id: AtomicU32::new(0),
101 storage: Storage::new(directory)?,
102 block_cache: RwLock::new(BlockCache::default()),
103 metrics: Metrics::default(),
104 };
105 wal.fill_block_cache()?;
106 Ok(wal)
107 }
108
109 fn block_cache(&self) -> RwLockReadGuard<'_, BlockCache> {
110 self.block_cache.read()
111 }
112
113 #[instrument(skip(self))]
115 fn fill_block_cache(&mut self) -> WalResult<()> {
116 let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
117 self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
118
119 let mut block_cache = self.block_cache.write();
120 let mut notifications_size = 0;
121
122 for entry in self.storage.iter_notifications(files_range) {
123 let (file_id, size, notification) = entry?;
124
125 notifications_size += size;
126
127 let committed_chain = notification.committed_chain();
128 let reverted_chain = notification.reverted_chain();
129
130 debug!(
131 target: "exex::wal",
132 ?file_id,
133 reverted_block_range = ?reverted_chain.as_ref().map(|chain| chain.range()),
134 committed_block_range = ?committed_chain.as_ref().map(|chain| chain.range()),
135 "Inserting block cache entries"
136 );
137
138 block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification);
139 }
140
141 self.update_metrics(&block_cache, notifications_size as i64);
142
143 Ok(())
144 }
145
146 #[instrument(skip_all, fields(
147 reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
148 committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
149 ))]
150 fn commit(&self, notification: &ExExNotification<N>) -> WalResult<()> {
151 let mut block_cache = self.block_cache.write();
152
153 let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
154 let size = self.storage.write_notification(file_id, notification)?;
155
156 debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache");
157 block_cache.insert_notification_blocks_with_file_id(file_id, notification);
158
159 self.update_metrics(&block_cache, size as i64);
160
161 Ok(())
162 }
163
164 #[instrument(skip(self))]
165 fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
166 let mut block_cache = self.block_cache.write();
167 let file_ids = block_cache.remove_before(to_block.number);
168
169 if file_ids.is_empty() {
171 debug!(target: "exex::wal", "No notifications were finalized from the storage");
172 return Ok(())
173 }
174
175 let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
176 debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized");
177
178 self.update_metrics(&block_cache, -(removed_size as i64));
179
180 Ok(())
181 }
182
183 fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
184 self.metrics.size_bytes.increment(size_delta as f64);
185 self.metrics.notifications_count.set(block_cache.notification_max_blocks.len() as f64);
186 self.metrics.committed_blocks_count.set(block_cache.committed_blocks.len() as f64);
187
188 if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
189 self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
190 }
191
192 if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
193 self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
194 }
195 }
196
197 fn iter_notifications(
199 &self,
200 ) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification<N>>> + '_>> {
201 let Some(range) = self.storage.files_range()? else {
202 return Ok(Box::new(std::iter::empty()))
203 };
204
205 Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
206 }
207}
208
209#[derive(Debug)]
211pub struct WalHandle<N: NodePrimitives> {
212 wal: Arc<WalInner<N>>,
213}
214
215impl<N> WalHandle<N>
216where
217 N: NodePrimitives,
218{
219 pub fn get_committed_notification_by_block_hash(
221 &self,
222 block_hash: &B256,
223 ) -> WalResult<Option<ExExNotification<N>>> {
224 let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
225 else {
226 return Ok(None)
227 };
228
229 self.wal
230 .storage
231 .read_notification(file_id)
232 .map(|entry| entry.map(|(notification, _)| notification))
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use crate::wal::{cache::CachedBlock, error::WalResult, Wal};
239 use alloy_primitives::B256;
240 use itertools::Itertools;
241 use reth_exex_types::ExExNotification;
242 use reth_provider::Chain;
243 use reth_testing_utils::generators::{
244 self, random_block, random_block_range, BlockParams, BlockRangeParams,
245 };
246 use std::sync::Arc;
247
248 fn read_notifications(wal: &Wal) -> WalResult<Vec<ExExNotification>> {
249 wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
250 wal.inner
251 .storage
252 .iter_notifications(range)
253 .map(|entry| entry.map(|(_, _, n)| n))
254 .collect()
255 })
256 }
257
258 fn sort_committed_blocks(
259 committed_blocks: Vec<(B256, u32, CachedBlock)>,
260 ) -> Vec<(B256, u32, CachedBlock)> {
261 committed_blocks
262 .into_iter()
263 .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
264 .collect()
265 }
266
267 #[test]
268 fn test_wal() -> eyre::Result<()> {
269 reth_tracing::init_test_tracing();
270
271 let mut rng = generators::rng();
272
273 let temp_dir = tempfile::tempdir()?;
275 let wal = Wal::new(&temp_dir)?;
276 assert!(wal.inner.block_cache().is_empty());
277
278 let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
280 .into_iter()
281 .map(|block| block.try_recover())
282 .collect::<Result<Vec<_>, _>>()?;
283 let block_1_reorged = random_block(
284 &mut rng,
285 1,
286 BlockParams { parent: Some(blocks[0].hash()), ..Default::default() },
287 )
288 .try_recover()?;
289 let block_2_reorged = random_block(
290 &mut rng,
291 2,
292 BlockParams { parent: Some(blocks[1].hash()), ..Default::default() },
293 )
294 .try_recover()?;
295
296 let committed_notification_1 = ExExNotification::ChainCommitted {
303 new: Arc::new(Chain::new(
304 vec![blocks[0].clone(), blocks[1].clone()],
305 Default::default(),
306 None,
307 )),
308 };
309 let reverted_notification = ExExNotification::ChainReverted {
310 old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
311 };
312 let committed_notification_2 = ExExNotification::ChainCommitted {
313 new: Arc::new(Chain::new(
314 vec![block_1_reorged.clone(), blocks[2].clone()],
315 Default::default(),
316 None,
317 )),
318 };
319 let reorged_notification = ExExNotification::ChainReorged {
320 old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
321 new: Arc::new(Chain::new(
322 vec![block_2_reorged.clone(), blocks[3].clone()],
323 Default::default(),
324 None,
325 )),
326 };
327
328 let file_id = 0;
333 let committed_notification_1_cache_blocks = (blocks[1].number, file_id);
334 let committed_notification_1_cache_committed_blocks = vec![
335 (
336 blocks[0].hash(),
337 file_id,
338 CachedBlock {
339 block: (blocks[0].number, blocks[0].hash()).into(),
340 parent_hash: blocks[0].parent_hash,
341 },
342 ),
343 (
344 blocks[1].hash(),
345 file_id,
346 CachedBlock {
347 block: (blocks[1].number, blocks[1].hash()).into(),
348 parent_hash: blocks[1].parent_hash,
349 },
350 ),
351 ];
352 wal.commit(&committed_notification_1)?;
353 assert_eq!(
354 wal.inner.block_cache().blocks_sorted(),
355 [committed_notification_1_cache_blocks]
356 );
357 assert_eq!(
358 wal.inner.block_cache().committed_blocks_sorted(),
359 committed_notification_1_cache_committed_blocks
360 );
361 assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
362
363 wal.commit(&reverted_notification)?;
365 let file_id = 1;
366 let reverted_notification_cache_blocks = (blocks[1].number, file_id);
367 assert_eq!(
368 wal.inner.block_cache().blocks_sorted(),
369 [reverted_notification_cache_blocks, committed_notification_1_cache_blocks]
370 );
371 assert_eq!(
372 wal.inner.block_cache().committed_blocks_sorted(),
373 committed_notification_1_cache_committed_blocks
374 );
375 assert_eq!(
376 read_notifications(&wal)?,
377 vec![committed_notification_1.clone(), reverted_notification.clone()]
378 );
379
380 wal.commit(&committed_notification_2)?;
382 let file_id = 2;
383 let committed_notification_2_cache_blocks = (blocks[2].number, file_id);
384 let committed_notification_2_cache_committed_blocks = vec![
385 (
386 block_1_reorged.hash(),
387 file_id,
388 CachedBlock {
389 block: (block_1_reorged.number, block_1_reorged.hash()).into(),
390 parent_hash: block_1_reorged.parent_hash,
391 },
392 ),
393 (
394 blocks[2].hash(),
395 file_id,
396 CachedBlock {
397 block: (blocks[2].number, blocks[2].hash()).into(),
398 parent_hash: blocks[2].parent_hash,
399 },
400 ),
401 ];
402 assert_eq!(
403 wal.inner.block_cache().blocks_sorted(),
404 [
405 committed_notification_2_cache_blocks,
406 reverted_notification_cache_blocks,
407 committed_notification_1_cache_blocks,
408 ]
409 );
410 assert_eq!(
411 wal.inner.block_cache().committed_blocks_sorted(),
412 sort_committed_blocks(
413 [
414 committed_notification_1_cache_committed_blocks.clone(),
415 committed_notification_2_cache_committed_blocks.clone()
416 ]
417 .concat()
418 )
419 );
420 assert_eq!(
421 read_notifications(&wal)?,
422 vec![
423 committed_notification_1.clone(),
424 reverted_notification.clone(),
425 committed_notification_2.clone()
426 ]
427 );
428
429 wal.commit(&reorged_notification)?;
431 let file_id = 3;
432 let reorged_notification_cache_blocks = (blocks[3].number, file_id);
433 let reorged_notification_cache_committed_blocks = vec![
434 (
435 block_2_reorged.hash(),
436 file_id,
437 CachedBlock {
438 block: (block_2_reorged.number, block_2_reorged.hash()).into(),
439 parent_hash: block_2_reorged.parent_hash,
440 },
441 ),
442 (
443 blocks[3].hash(),
444 file_id,
445 CachedBlock {
446 block: (blocks[3].number, blocks[3].hash()).into(),
447 parent_hash: blocks[3].parent_hash,
448 },
449 ),
450 ];
451 assert_eq!(
452 wal.inner.block_cache().blocks_sorted(),
453 [
454 reorged_notification_cache_blocks,
455 committed_notification_2_cache_blocks,
456 reverted_notification_cache_blocks,
457 committed_notification_1_cache_blocks,
458 ]
459 );
460 assert_eq!(
461 wal.inner.block_cache().committed_blocks_sorted(),
462 sort_committed_blocks(
463 [
464 committed_notification_1_cache_committed_blocks,
465 committed_notification_2_cache_committed_blocks.clone(),
466 reorged_notification_cache_committed_blocks.clone()
467 ]
468 .concat()
469 )
470 );
471 assert_eq!(
472 read_notifications(&wal)?,
473 vec![
474 committed_notification_1,
475 reverted_notification,
476 committed_notification_2.clone(),
477 reorged_notification.clone()
478 ]
479 );
480
481 wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
486 assert_eq!(
487 wal.inner.block_cache().blocks_sorted(),
488 [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
489 );
490 assert_eq!(
491 wal.inner.block_cache().committed_blocks_sorted(),
492 sort_committed_blocks(
493 [
494 committed_notification_2_cache_committed_blocks.clone(),
495 reorged_notification_cache_committed_blocks.clone()
496 ]
497 .concat()
498 )
499 );
500 assert_eq!(
501 read_notifications(&wal)?,
502 vec![committed_notification_2.clone(), reorged_notification.clone()]
503 );
504
505 let wal = Wal::new(&temp_dir)?;
507 assert_eq!(
508 wal.inner.block_cache().blocks_sorted(),
509 [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
510 );
511 assert_eq!(
512 wal.inner.block_cache().committed_blocks_sorted(),
513 sort_committed_blocks(
514 [
515 committed_notification_2_cache_committed_blocks,
516 reorged_notification_cache_committed_blocks
517 ]
518 .concat()
519 )
520 );
521 assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
522
523 Ok(())
524 }
525}