reth_exex/wal/
cache.rs

1use std::{
2    cmp::Reverse,
3    collections::{BinaryHeap, HashSet},
4};
5
6use alloy_consensus::BlockHeader;
7use alloy_eips::BlockNumHash;
8use alloy_primitives::{map::FbHashMap, BlockNumber, B256};
9use reth_exex_types::ExExNotification;
10use reth_node_api::NodePrimitives;
11
12/// The block cache of the WAL.
13///
14/// This cache is needed to avoid walking the WAL directory every time we want to find a
15/// notification corresponding to a block or a block corresponding to a hash.
16#[derive(Debug, Default)]
17pub struct BlockCache {
18    /// A min heap of `(Block Number, File ID)` tuples.
19    ///
20    /// Contains one highest block in notification. In a notification with both committed and
21    /// reverted chain, the highest block is chosen between both chains.
22    pub(super) notification_max_blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
23    /// A mapping of committed blocks `Block Hash -> Block`.
24    ///
25    /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
26    /// block.
27    pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
28    /// Block height of the lowest committed block currently in the cache.
29    pub(super) lowest_committed_block_height: Option<BlockNumber>,
30    /// Block height of the highest committed block currently in the cache.
31    pub(super) highest_committed_block_height: Option<BlockNumber>,
32}
33
34impl BlockCache {
35    /// Returns `true` if the cache is empty.
36    pub(super) fn is_empty(&self) -> bool {
37        self.notification_max_blocks.is_empty()
38    }
39
40    /// Returns the number of blocks in the cache.
41    pub(super) fn num_blocks(&self) -> usize {
42        self.committed_blocks.len()
43    }
44
45    /// Removes all files from the cache that has notifications with a tip block less than or equal
46    /// to the given block number.
47    ///
48    /// # Returns
49    ///
50    /// A set of file IDs that were removed.
51    pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
52        let mut file_ids = HashSet::default();
53
54        while let Some(block @ Reverse((max_block, file_id))) =
55            self.notification_max_blocks.peek().copied()
56        {
57            if max_block <= block_number {
58                let popped_block = self.notification_max_blocks.pop().unwrap();
59                debug_assert_eq!(popped_block, block);
60                file_ids.insert(file_id);
61            } else {
62                break
63            }
64        }
65
66        let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None);
67        self.committed_blocks.retain(|_, (file_id, block)| {
68            let retain = !file_ids.contains(file_id);
69
70            if retain {
71                lowest_committed_block_height = Some(
72                    lowest_committed_block_height
73                        .map_or(block.block.number, |lowest| block.block.number.min(lowest)),
74                );
75                highest_committed_block_height = Some(
76                    highest_committed_block_height
77                        .map_or(block.block.number, |highest| block.block.number.max(highest)),
78                );
79            }
80
81            retain
82        });
83        self.lowest_committed_block_height = lowest_committed_block_height;
84        self.highest_committed_block_height = highest_committed_block_height;
85
86        file_ids
87    }
88
89    /// Returns the file ID for the notification containing the given committed block hash, if it
90    /// exists.
91    pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option<u32> {
92        self.committed_blocks.get(block_hash).map(|entry| entry.0)
93    }
94
95    /// Inserts the blocks from the notification into the cache with the given file ID.
96    pub(super) fn insert_notification_blocks_with_file_id<N: NodePrimitives>(
97        &mut self,
98        file_id: u32,
99        notification: &ExExNotification<N>,
100    ) {
101        let reverted_chain = notification.reverted_chain();
102        let committed_chain = notification.committed_chain();
103
104        let max_block =
105            reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number()).max();
106        if let Some(max_block) = max_block {
107            self.notification_max_blocks.push(Reverse((max_block, file_id)));
108        }
109
110        if let Some(committed_chain) = &committed_chain {
111            for block in committed_chain.blocks().values() {
112                let cached_block = CachedBlock {
113                    block: (block.number(), block.hash()).into(),
114                    parent_hash: block.parent_hash(),
115                };
116                self.committed_blocks.insert(block.hash(), (file_id, cached_block));
117            }
118
119            self.highest_committed_block_height = Some(committed_chain.tip().number());
120        }
121    }
122
123    #[cfg(test)]
124    pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
125        self.notification_max_blocks
126            .clone()
127            .into_sorted_vec()
128            .into_iter()
129            .map(|entry| entry.0)
130            .collect()
131    }
132
133    #[cfg(test)]
134    pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u32, CachedBlock)> {
135        use itertools::Itertools;
136
137        self.committed_blocks
138            .iter()
139            .map(|(hash, (file_id, block))| (*hash, *file_id, *block))
140            .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
141            .collect()
142    }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub(super) struct CachedBlock {
147    /// The block number and hash of the block.
148    pub(super) block: BlockNumHash,
149    /// The hash of the parent block.
150    pub(super) parent_hash: B256,
151}