reth_exex/wal/
mod.rs

1#![allow(dead_code)]
2
3mod cache;
4pub use cache::BlockCache;
5mod storage;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_node_api::NodePrimitives;
8pub use storage::Storage;
9mod metrics;
10use metrics::Metrics;
11mod error;
12pub use error::{WalError, WalResult};
13
14use std::{
15    path::Path,
16    sync::{
17        atomic::{AtomicU32, Ordering},
18        Arc,
19    },
20};
21
22use alloy_eips::BlockNumHash;
23use alloy_primitives::B256;
24use parking_lot::{RwLock, RwLockReadGuard};
25use reth_exex_types::ExExNotification;
26use reth_tracing::tracing::{debug, instrument};
27
28/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes.
29///
30/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache
31/// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory
32/// and decoding notifications every time we want to iterate or finalize the WAL.
33///
34/// The expected mode of operation is as follows:
35/// 1. On every new canonical chain notification, call [`Wal::commit`].
36/// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
37///    WAL.
38#[derive(Debug, Clone)]
39pub struct Wal<N: NodePrimitives = EthPrimitives> {
40    inner: Arc<WalInner<N>>,
41}
42
43impl<N> Wal<N>
44where
45    N: NodePrimitives,
46{
47    /// Creates a new instance of [`Wal`].
48    pub fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
49        Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
50    }
51
52    /// Returns a read-only handle to the WAL.
53    pub fn handle(&self) -> WalHandle<N> {
54        WalHandle { wal: self.inner.clone() }
55    }
56
57    /// Commits the notification to WAL.
58    pub fn commit(&self, notification: &ExExNotification<N>) -> WalResult<()> {
59        self.inner.commit(notification)
60    }
61
62    /// Finalizes the WAL up to the given canonical block, inclusive.
63    ///
64    /// The caller should check that all ExExes are on the canonical chain and will not need any
65    /// blocks from the WAL below the provided block, inclusive.
66    pub fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
67        self.inner.finalize(to_block)
68    }
69
70    /// Returns an iterator over all notifications in the WAL.
71    pub fn iter_notifications(
72        &self,
73    ) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification<N>>> + '_>> {
74        self.inner.iter_notifications()
75    }
76
77    /// Returns the number of blocks in the WAL.
78    pub fn num_blocks(&self) -> usize {
79        self.inner.block_cache().num_blocks()
80    }
81}
82
83/// Inner type for the WAL.
84#[derive(Debug)]
85struct WalInner<N: NodePrimitives> {
86    next_file_id: AtomicU32,
87    /// The underlying WAL storage backed by a file.
88    storage: Storage<N>,
89    /// WAL block cache. See [`cache::BlockCache`] docs for more details.
90    block_cache: RwLock<BlockCache>,
91    metrics: Metrics,
92}
93
94impl<N> WalInner<N>
95where
96    N: NodePrimitives,
97{
98    fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
99        let mut wal = Self {
100            next_file_id: AtomicU32::new(0),
101            storage: Storage::new(directory)?,
102            block_cache: RwLock::new(BlockCache::default()),
103            metrics: Metrics::default(),
104        };
105        wal.fill_block_cache()?;
106        Ok(wal)
107    }
108
109    fn block_cache(&self) -> RwLockReadGuard<'_, BlockCache> {
110        self.block_cache.read()
111    }
112
113    /// Fills the block cache with the notifications from the storage.
114    #[instrument(skip(self))]
115    fn fill_block_cache(&mut self) -> WalResult<()> {
116        let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
117        self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
118
119        let mut block_cache = self.block_cache.write();
120        let mut notifications_size = 0;
121
122        for entry in self.storage.iter_notifications(files_range) {
123            let (file_id, size, notification) = entry?;
124
125            notifications_size += size;
126
127            let committed_chain = notification.committed_chain();
128            let reverted_chain = notification.reverted_chain();
129
130            debug!(
131                target: "exex::wal",
132                ?file_id,
133                reverted_block_range = ?reverted_chain.as_ref().map(|chain| chain.range()),
134                committed_block_range = ?committed_chain.as_ref().map(|chain| chain.range()),
135                "Inserting block cache entries"
136            );
137
138            block_cache.insert_notification_blocks_with_file_id(file_id, &notification);
139        }
140
141        self.update_metrics(&block_cache, notifications_size as i64);
142
143        Ok(())
144    }
145
146    #[instrument(skip_all, fields(
147        reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
148        committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
149    ))]
150    fn commit(&self, notification: &ExExNotification<N>) -> WalResult<()> {
151        let mut block_cache = self.block_cache.write();
152
153        let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
154        let size = self.storage.write_notification(file_id, notification)?;
155
156        debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache");
157        block_cache.insert_notification_blocks_with_file_id(file_id, notification);
158
159        self.update_metrics(&block_cache, size as i64);
160
161        Ok(())
162    }
163
164    #[instrument(skip(self))]
165    fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
166        let mut block_cache = self.block_cache.write();
167        let file_ids = block_cache.remove_before(to_block.number);
168
169        // Remove notifications from the storage.
170        if file_ids.is_empty() {
171            debug!(target: "exex::wal", "No notifications were finalized from the storage");
172            return Ok(())
173        }
174
175        let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
176        debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized");
177
178        self.update_metrics(&block_cache, -(removed_size as i64));
179
180        Ok(())
181    }
182
183    fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
184        self.metrics.size_bytes.increment(size_delta as f64);
185        self.metrics.notifications_count.set(block_cache.notification_max_blocks.len() as f64);
186        self.metrics.committed_blocks_count.set(block_cache.committed_blocks.len() as f64);
187
188        if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
189            self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
190        }
191
192        if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
193            self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
194        }
195    }
196
197    /// Returns an iterator over all notifications in the WAL.
198    fn iter_notifications(
199        &self,
200    ) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification<N>>> + '_>> {
201        let Some(range) = self.storage.files_range()? else {
202            return Ok(Box::new(std::iter::empty()))
203        };
204
205        Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
206    }
207}
208
209/// A read-only handle to the WAL that can be shared.
210#[derive(Debug)]
211pub struct WalHandle<N: NodePrimitives> {
212    wal: Arc<WalInner<N>>,
213}
214
215impl<N> WalHandle<N>
216where
217    N: NodePrimitives,
218{
219    /// Returns the notification for the given committed block hash if it exists.
220    pub fn get_committed_notification_by_block_hash(
221        &self,
222        block_hash: &B256,
223    ) -> WalResult<Option<ExExNotification<N>>> {
224        let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
225        else {
226            return Ok(None)
227        };
228
229        self.wal
230            .storage
231            .read_notification(file_id)
232            .map(|entry| entry.map(|(notification, _)| notification))
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use crate::wal::{cache::CachedBlock, error::WalResult, Wal};
239    use alloy_primitives::B256;
240    use itertools::Itertools;
241    use reth_exex_types::ExExNotification;
242    use reth_provider::Chain;
243    use reth_testing_utils::generators::{
244        self, random_block, random_block_range, BlockParams, BlockRangeParams,
245    };
246    use std::sync::Arc;
247
248    fn read_notifications(wal: &Wal) -> WalResult<Vec<ExExNotification>> {
249        wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
250            wal.inner
251                .storage
252                .iter_notifications(range)
253                .map(|entry| entry.map(|(_, _, n)| n))
254                .collect()
255        })
256    }
257
258    fn sort_committed_blocks(
259        committed_blocks: Vec<(B256, u32, CachedBlock)>,
260    ) -> Vec<(B256, u32, CachedBlock)> {
261        committed_blocks
262            .into_iter()
263            .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
264            .collect()
265    }
266
267    #[test]
268    fn test_wal() -> eyre::Result<()> {
269        reth_tracing::init_test_tracing();
270
271        let mut rng = generators::rng();
272
273        // Create an instance of the WAL in a temporary directory
274        let temp_dir = tempfile::tempdir()?;
275        let wal = Wal::new(&temp_dir)?;
276        assert!(wal.inner.block_cache().is_empty());
277
278        // Create 4 canonical blocks and one reorged block with number 2
279        let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
280            .into_iter()
281            .map(|block| block.try_recover())
282            .collect::<Result<Vec<_>, _>>()?;
283        let block_1_reorged = random_block(
284            &mut rng,
285            1,
286            BlockParams { parent: Some(blocks[0].hash()), ..Default::default() },
287        )
288        .try_recover()?;
289        let block_2_reorged = random_block(
290            &mut rng,
291            2,
292            BlockParams { parent: Some(blocks[1].hash()), ..Default::default() },
293        )
294        .try_recover()?;
295
296        // Create notifications for the above blocks.
297        // 1. Committed notification for blocks with number 0 and 1
298        // 2. Reverted notification for block with number 1
299        // 3. Committed notification for block with number 1 and 2
300        // 4. Reorged notification for block with number 2 that was reverted, and blocks with number
301        //    2 and 3 that were committed
302        let committed_notification_1 = ExExNotification::ChainCommitted {
303            new: Arc::new(Chain::new(
304                vec![blocks[0].clone(), blocks[1].clone()],
305                Default::default(),
306                None,
307            )),
308        };
309        let reverted_notification = ExExNotification::ChainReverted {
310            old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
311        };
312        let committed_notification_2 = ExExNotification::ChainCommitted {
313            new: Arc::new(Chain::new(
314                vec![block_1_reorged.clone(), blocks[2].clone()],
315                Default::default(),
316                None,
317            )),
318        };
319        let reorged_notification = ExExNotification::ChainReorged {
320            old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
321            new: Arc::new(Chain::new(
322                vec![block_2_reorged.clone(), blocks[3].clone()],
323                Default::default(),
324                None,
325            )),
326        };
327
328        // Commit notifications, verify that the block cache is updated and the notifications are
329        // written to WAL.
330
331        // First notification (commit block 0, 1)
332        let file_id = 0;
333        let committed_notification_1_cache_blocks = (blocks[1].number, file_id);
334        let committed_notification_1_cache_committed_blocks = vec![
335            (
336                blocks[0].hash(),
337                file_id,
338                CachedBlock {
339                    block: (blocks[0].number, blocks[0].hash()).into(),
340                    parent_hash: blocks[0].parent_hash,
341                },
342            ),
343            (
344                blocks[1].hash(),
345                file_id,
346                CachedBlock {
347                    block: (blocks[1].number, blocks[1].hash()).into(),
348                    parent_hash: blocks[1].parent_hash,
349                },
350            ),
351        ];
352        wal.commit(&committed_notification_1)?;
353        assert_eq!(
354            wal.inner.block_cache().blocks_sorted(),
355            [committed_notification_1_cache_blocks]
356        );
357        assert_eq!(
358            wal.inner.block_cache().committed_blocks_sorted(),
359            committed_notification_1_cache_committed_blocks
360        );
361        assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
362
363        // Second notification (revert block 1)
364        wal.commit(&reverted_notification)?;
365        let file_id = 1;
366        let reverted_notification_cache_blocks = (blocks[1].number, file_id);
367        assert_eq!(
368            wal.inner.block_cache().blocks_sorted(),
369            [reverted_notification_cache_blocks, committed_notification_1_cache_blocks]
370        );
371        assert_eq!(
372            wal.inner.block_cache().committed_blocks_sorted(),
373            committed_notification_1_cache_committed_blocks
374        );
375        assert_eq!(
376            read_notifications(&wal)?,
377            vec![committed_notification_1.clone(), reverted_notification.clone()]
378        );
379
380        // Third notification (commit block 1, 2)
381        wal.commit(&committed_notification_2)?;
382        let file_id = 2;
383        let committed_notification_2_cache_blocks = (blocks[2].number, file_id);
384        let committed_notification_2_cache_committed_blocks = vec![
385            (
386                block_1_reorged.hash(),
387                file_id,
388                CachedBlock {
389                    block: (block_1_reorged.number, block_1_reorged.hash()).into(),
390                    parent_hash: block_1_reorged.parent_hash,
391                },
392            ),
393            (
394                blocks[2].hash(),
395                file_id,
396                CachedBlock {
397                    block: (blocks[2].number, blocks[2].hash()).into(),
398                    parent_hash: blocks[2].parent_hash,
399                },
400            ),
401        ];
402        assert_eq!(
403            wal.inner.block_cache().blocks_sorted(),
404            [
405                committed_notification_2_cache_blocks,
406                reverted_notification_cache_blocks,
407                committed_notification_1_cache_blocks,
408            ]
409        );
410        assert_eq!(
411            wal.inner.block_cache().committed_blocks_sorted(),
412            sort_committed_blocks(
413                [
414                    committed_notification_1_cache_committed_blocks.clone(),
415                    committed_notification_2_cache_committed_blocks.clone()
416                ]
417                .concat()
418            )
419        );
420        assert_eq!(
421            read_notifications(&wal)?,
422            vec![
423                committed_notification_1.clone(),
424                reverted_notification.clone(),
425                committed_notification_2.clone()
426            ]
427        );
428
429        // Fourth notification (revert block 2, commit block 2, 3)
430        wal.commit(&reorged_notification)?;
431        let file_id = 3;
432        let reorged_notification_cache_blocks = (blocks[3].number, file_id);
433        let reorged_notification_cache_committed_blocks = vec![
434            (
435                block_2_reorged.hash(),
436                file_id,
437                CachedBlock {
438                    block: (block_2_reorged.number, block_2_reorged.hash()).into(),
439                    parent_hash: block_2_reorged.parent_hash,
440                },
441            ),
442            (
443                blocks[3].hash(),
444                file_id,
445                CachedBlock {
446                    block: (blocks[3].number, blocks[3].hash()).into(),
447                    parent_hash: blocks[3].parent_hash,
448                },
449            ),
450        ];
451        assert_eq!(
452            wal.inner.block_cache().blocks_sorted(),
453            [
454                reorged_notification_cache_blocks,
455                committed_notification_2_cache_blocks,
456                reverted_notification_cache_blocks,
457                committed_notification_1_cache_blocks,
458            ]
459        );
460        assert_eq!(
461            wal.inner.block_cache().committed_blocks_sorted(),
462            sort_committed_blocks(
463                [
464                    committed_notification_1_cache_committed_blocks,
465                    committed_notification_2_cache_committed_blocks.clone(),
466                    reorged_notification_cache_committed_blocks.clone()
467                ]
468                .concat()
469            )
470        );
471        assert_eq!(
472            read_notifications(&wal)?,
473            vec![
474                committed_notification_1,
475                reverted_notification,
476                committed_notification_2.clone(),
477                reorged_notification.clone()
478            ]
479        );
480
481        // Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also
482        // had block 2 committed. In this case, we can't split the notification into two parts, so
483        // we preserve the whole notification in both the block cache and the storage, and delete
484        // the notifications before it.
485        wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
486        assert_eq!(
487            wal.inner.block_cache().blocks_sorted(),
488            [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
489        );
490        assert_eq!(
491            wal.inner.block_cache().committed_blocks_sorted(),
492            sort_committed_blocks(
493                [
494                    committed_notification_2_cache_committed_blocks.clone(),
495                    reorged_notification_cache_committed_blocks.clone()
496                ]
497                .concat()
498            )
499        );
500        assert_eq!(
501            read_notifications(&wal)?,
502            vec![committed_notification_2.clone(), reorged_notification.clone()]
503        );
504
505        // Re-open the WAL and verify that the cache population works correctly
506        let wal = Wal::new(&temp_dir)?;
507        assert_eq!(
508            wal.inner.block_cache().blocks_sorted(),
509            [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
510        );
511        assert_eq!(
512            wal.inner.block_cache().committed_blocks_sorted(),
513            sort_committed_blocks(
514                [
515                    committed_notification_2_cache_committed_blocks,
516                    reorged_notification_cache_committed_blocks
517                ]
518                .concat()
519            )
520        );
521        assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
522
523        Ok(())
524    }
525}