Skip to main content

reth_storage_api/
bal.rs

1use alloc::{sync::Arc, vec::Vec};
2use alloy_eip7928::bal::DecodedBal;
3pub use alloy_eip7928::bal::RawBal;
4use alloy_eips::NumHash;
5use alloy_primitives::{BlockHash, BlockNumber, Bytes};
6use reth_storage_errors::provider::ProviderResult;
7use revm_database::state::bal::Bal as RevmBal;
8
9/// Notification emitted when a new BAL is inserted into the store.
10#[derive(Clone, Debug, PartialEq, Eq)]
11pub struct BalNotification {
12    /// Number and hash of the block the BAL belongs to.
13    pub num_hash: NumHash,
14    /// Raw BAL RLP payload.
15    pub bal: RawBal,
16}
17
18impl BalNotification {
19    /// Creates a new [`BalNotification`].
20    pub const fn new(num_hash: NumHash, bal: RawBal) -> Self {
21        Self { num_hash, bal }
22    }
23}
24
25#[cfg(feature = "std")]
26pub use self::subscriptions::BalNotificationStream;
27
28#[cfg(feature = "std")]
29mod subscriptions {
30    use super::BalNotification;
31
32    /// A stream of [`BalNotification`]s.
33    pub type BalNotificationStream = reth_tokio_util::EventStream<BalNotification>;
34}
35
36/// Store for Block Access Lists (BALs).
37///
38/// This abstraction intentionally does not prescribe where BALs live. Implementations may keep
39/// recent BALs in memory, read canonical BALs from static files, or compose multiple tiers behind
40/// a single interface.
41#[auto_impl::auto_impl(&, Arc, Box)]
42pub trait BalStore: Send + Sync + 'static {
43    /// Insert the BAL for the given block.
44    ///
45    /// Implementations may buffer inserts. Call [`Self::flush`] when pending BALs need to be made
46    /// durable.
47    fn insert(&self, num_hash: NumHash, bal: RawBal) -> ProviderResult<()>;
48
49    /// Insert multiple BALs.
50    ///
51    /// The default implementation preserves the behavior of repeated [`Self::insert`] calls.
52    fn insert_many(&self, entries: Vec<(NumHash, RawBal)>) -> ProviderResult<()> {
53        for (num_hash, bal) in entries {
54            self.insert(num_hash, bal)?;
55        }
56        Ok(())
57    }
58
59    /// Flushes any pending BALs to the backing store.
60    ///
61    /// In-memory implementations may treat this as a no-op.
62    fn flush(&self) -> ProviderResult<()> {
63        Ok(())
64    }
65
66    /// Prunes expired BALs according to the store's retention policy and the given chain tip.
67    ///
68    /// Returns the number of BALs pruned.
69    fn prune(&self, tip: BlockNumber) -> ProviderResult<usize>;
70
71    /// Fetch BALs for the given block hashes.
72    ///
73    /// The returned vector must align with `block_hashes`.
74    fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>>;
75
76    /// Fetches the BAL for the given block hash.
77    fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
78        Ok(self.get_by_hashes(&[block_hash])?.into_iter().next().flatten())
79    }
80
81    /// Fetches and decodes the BAL for the given block hash.
82    fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
83        self.get_by_hash(block_hash)?
84            .map(DecodedBal::from_rlp_bytes)
85            .transpose()
86            .map_err(Into::into)
87    }
88
89    /// Fetches the BAL for the given block hash in revm representation.
90    fn revm_bal_by_hash(
91        &self,
92        block_hash: BlockHash,
93    ) -> ProviderResult<Option<DecodedBal<Arc<RevmBal>>>> {
94        self.get_decoded_by_hash(block_hash)?
95            .map(|decoded| {
96                decoded.try_map(|bal| {
97                    RevmBal::try_from(Vec::from(bal))
98                        .map(Arc::new)
99                        .map_err(reth_storage_errors::provider::ProviderError::other)
100                })
101            })
102            .transpose()
103    }
104
105    /// Fetch BAL response entries for the given block hashes, stopping after the soft limit is
106    /// exceeded.
107    ///
108    /// Entries are returned in request order. Unavailable BALs are represented as `None`. The
109    /// limit is soft: the entry that exceeds the limit is included.
110    fn get_by_hashes_with_limit(
111        &self,
112        block_hashes: &[BlockHash],
113        limit: GetBlockAccessListLimit,
114    ) -> ProviderResult<Vec<Option<Bytes>>> {
115        let mut out = Vec::new();
116        self.append_by_hashes_with_limit(block_hashes, limit, &mut out)?;
117        out.shrink_to_fit();
118        Ok(out)
119    }
120
121    /// Extends the given vector with BAL response entries for the given hashes.
122    ///
123    /// This adheres to the expected behavior of [`Self::get_by_hashes_with_limit`].
124    fn append_by_hashes_with_limit(
125        &self,
126        block_hashes: &[BlockHash],
127        limit: GetBlockAccessListLimit,
128        out: &mut Vec<Option<Bytes>>,
129    ) -> ProviderResult<()> {
130        let mut size = 0;
131        for bal in self.get_by_hashes(block_hashes)? {
132            size += bal.as_ref().map_or(1, |bytes| bytes.len());
133            out.push(bal);
134
135            if limit.exceeds(size) {
136                break
137            }
138        }
139        Ok(())
140    }
141
142    /// Returns a stream of BAL insert notifications.
143    ///
144    /// Notifications are emitted only after a BAL has been successfully inserted into the store.
145    /// They do not imply canonicality.
146    #[cfg(feature = "std")]
147    fn bal_stream(&self) -> BalNotificationStream;
148}
149
150/// The limit to enforce for [`BalStore::get_by_hashes_with_limit`].
151#[derive(Debug, Clone, Copy, Eq, PartialEq)]
152pub enum GetBlockAccessListLimit {
153    /// No limit, return all BALs.
154    None,
155    /// Enforce a size limit on the returned BALs, for example 2MB.
156    ResponseSizeSoftLimit(usize),
157}
158
159impl GetBlockAccessListLimit {
160    /// Returns true if the given size exceeds the limit.
161    #[inline]
162    pub const fn exceeds(&self, size: usize) -> bool {
163        match self {
164            Self::None => false,
165            Self::ResponseSizeSoftLimit(limit) => size > *limit,
166        }
167    }
168}
169
170/// Clone-friendly façade around a BAL store implementation.
171#[derive(Clone)]
172pub struct BalStoreHandle {
173    inner: Arc<dyn BalStore>,
174}
175
176impl BalStoreHandle {
177    /// Creates a new [`BalStoreHandle`] from the given implementation.
178    pub fn new(inner: impl BalStore) -> Self {
179        Self { inner: Arc::new(inner) }
180    }
181
182    /// Creates a [`BalStoreHandle`] backed by [`NoopBalStore`].
183    pub fn noop() -> Self {
184        Self::new(NoopBalStore)
185    }
186
187    /// Insert the BAL for the given block.
188    #[inline]
189    pub fn insert(&self, num_hash: NumHash, bal: RawBal) -> ProviderResult<()> {
190        self.inner.insert(num_hash, bal)
191    }
192
193    /// Insert multiple BALs.
194    #[inline]
195    pub fn insert_many(&self, entries: Vec<(NumHash, RawBal)>) -> ProviderResult<()> {
196        self.inner.insert_many(entries)
197    }
198
199    /// Flushes any pending BALs to the backing store.
200    #[inline]
201    pub fn flush(&self) -> ProviderResult<()> {
202        self.inner.flush()
203    }
204
205    /// Prunes expired BALs according to the store's retention policy and the given chain tip.
206    #[inline]
207    pub fn prune(&self, tip: BlockNumber) -> ProviderResult<usize> {
208        self.inner.prune(tip)
209    }
210
211    /// Fetch BALs for the given block hashes.
212    #[inline]
213    pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
214        self.inner.get_by_hashes(block_hashes)
215    }
216
217    /// Fetches the BAL for the given block hash.
218    #[inline]
219    pub fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
220        self.inner.get_by_hash(block_hash)
221    }
222
223    /// Fetches and decodes the BAL for the given block hash.
224    #[inline]
225    pub fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
226        self.inner.get_decoded_by_hash(block_hash)
227    }
228
229    /// Fetches the BAL for the given block hash in revm representation.
230    #[inline]
231    pub fn revm_bal_by_hash(
232        &self,
233        block_hash: BlockHash,
234    ) -> ProviderResult<Option<DecodedBal<Arc<RevmBal>>>> {
235        self.inner.revm_bal_by_hash(block_hash)
236    }
237
238    /// Fetch BAL response entries for the given block hashes, stopping after the soft limit is
239    /// exceeded.
240    #[inline]
241    pub fn get_by_hashes_with_limit(
242        &self,
243        block_hashes: &[BlockHash],
244        limit: GetBlockAccessListLimit,
245    ) -> ProviderResult<Vec<Option<Bytes>>> {
246        self.inner.get_by_hashes_with_limit(block_hashes, limit)
247    }
248
249    /// Extends the given vector with BAL response entries for the given hashes.
250    #[inline]
251    pub fn append_by_hashes_with_limit(
252        &self,
253        block_hashes: &[BlockHash],
254        limit: GetBlockAccessListLimit,
255        out: &mut Vec<Option<Bytes>>,
256    ) -> ProviderResult<()> {
257        self.inner.append_by_hashes_with_limit(block_hashes, limit, out)
258    }
259
260    /// Returns a stream of BAL insert notifications.
261    #[cfg(feature = "std")]
262    #[inline]
263    pub fn bal_stream(&self) -> BalNotificationStream {
264        self.inner.bal_stream()
265    }
266}
267
268impl Default for BalStoreHandle {
269    fn default() -> Self {
270        Self::noop()
271    }
272}
273
274impl core::fmt::Debug for BalStoreHandle {
275    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
276        f.debug_struct("BalStoreHandle").finish_non_exhaustive()
277    }
278}
279
280/// Provider-side access to BAL storage.
281#[auto_impl::auto_impl(&, Arc)]
282pub trait BalProvider {
283    /// Returns the configured BAL store handle.
284    fn bal_store(&self) -> &BalStoreHandle;
285}
286
287/// No-op BAL store used as the default wiring target until a concrete implementation is injected.
288#[derive(Debug, Default, Clone, Copy)]
289pub struct NoopBalStore;
290
291impl BalStore for NoopBalStore {
292    fn insert(&self, _num_hash: NumHash, _bal: RawBal) -> ProviderResult<()> {
293        Ok(())
294    }
295
296    fn insert_many(&self, _entries: Vec<(NumHash, RawBal)>) -> ProviderResult<()> {
297        Ok(())
298    }
299
300    fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
301        Ok(0)
302    }
303
304    fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
305        Ok(block_hashes.iter().map(|_| None).collect())
306    }
307
308    fn append_by_hashes_with_limit(
309        &self,
310        block_hashes: &[BlockHash],
311        limit: GetBlockAccessListLimit,
312        out: &mut Vec<Option<Bytes>>,
313    ) -> ProviderResult<()> {
314        let mut size = 0;
315        for _ in block_hashes {
316            size += 1;
317            out.push(None);
318
319            if limit.exceeds(size) {
320                break
321            }
322        }
323        Ok(())
324    }
325
326    #[cfg(feature = "std")]
327    fn bal_stream(&self) -> BalNotificationStream {
328        reth_tokio_util::EventSender::new(1).new_listener()
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use alloy_primitives::B256;
336    #[cfg(feature = "std")]
337    use tokio_stream::StreamExt;
338
339    const EMPTY_LIST_CODE: u8 = 0xc0;
340
341    #[test]
342    fn noop_store_returns_empty_results() {
343        let store = BalStoreHandle::default();
344        let hashes = [B256::random(), B256::random()];
345
346        let by_hash = store.get_by_hashes(&hashes).unwrap();
347
348        assert_eq!(by_hash, vec![None, None]);
349        assert!(store.get_by_hash(B256::random()).unwrap().is_none());
350        assert_eq!(store.prune(10).unwrap(), 0);
351    }
352
353    #[test]
354    fn noop_store_flush_is_noop() {
355        let store = BalStoreHandle::default();
356
357        store.flush().unwrap();
358    }
359
360    #[test]
361    fn noop_store_decoded_lookup_returns_none() {
362        let store = BalStoreHandle::default();
363
364        assert!(store.get_decoded_by_hash(B256::random()).unwrap().is_none());
365    }
366
367    #[test]
368    fn decoded_lookup_decodes_raw_bal() {
369        let hash = B256::random();
370        let raw_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
371        let store = BalStoreHandle::new(TestBalStore { hash, raw_bal: raw_bal.clone() });
372
373        assert_eq!(store.get_by_hash(hash).unwrap(), Some(raw_bal.clone()));
374
375        let decoded = store.get_decoded_by_hash(hash).unwrap().unwrap();
376
377        assert_eq!(decoded.as_raw(), &raw_bal);
378
379        let revm_bal = store.revm_bal_by_hash(hash).unwrap().unwrap();
380
381        assert_eq!(revm_bal.as_raw(), &raw_bal);
382        assert!(revm_bal.as_bal().accounts.is_empty());
383    }
384
385    #[test]
386    fn noop_store_limited_lookup_returns_prefix() {
387        let store = BalStoreHandle::default();
388        let hashes = [B256::random(), B256::random(), B256::random()];
389
390        let limited = store
391            .get_by_hashes_with_limit(&hashes, GetBlockAccessListLimit::ResponseSizeSoftLimit(1))
392            .unwrap();
393
394        assert_eq!(limited, vec![None, None]);
395    }
396
397    #[test]
398    fn block_access_list_limit() {
399        let limit_none = GetBlockAccessListLimit::None;
400        assert!(!limit_none.exceeds(usize::MAX));
401
402        let size_limit_2mb = GetBlockAccessListLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
403        assert!(!size_limit_2mb.exceeds(1024 * 1024));
404        assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
405        assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
406    }
407
408    #[cfg(feature = "std")]
409    #[tokio::test]
410    async fn noop_store_stream_is_empty() {
411        let store = BalStoreHandle::default();
412        let mut stream = store.bal_stream();
413
414        assert!(stream.next().await.is_none());
415    }
416
417    #[derive(Debug)]
418    struct TestBalStore {
419        hash: B256,
420        raw_bal: Bytes,
421    }
422
423    impl BalStore for TestBalStore {
424        fn insert(&self, _num_hash: NumHash, _bal: RawBal) -> ProviderResult<()> {
425            Ok(())
426        }
427
428        fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
429            Ok(0)
430        }
431
432        fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
433            Ok(block_hashes
434                .iter()
435                .map(|hash| (*hash == self.hash).then(|| self.raw_bal.clone()))
436                .collect())
437        }
438
439        #[cfg(feature = "std")]
440        fn bal_stream(&self) -> BalNotificationStream {
441            reth_tokio_util::EventSender::new(1).new_listener()
442        }
443    }
444}