Skip to main content

reth_storage_api/
bal.rs

1use alloc::{sync::Arc, vec::Vec};
2use alloy_eip7928::bal::DecodedBal;
3use alloy_eips::NumHash;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealed};
5use reth_storage_errors::provider::ProviderResult;
6use revm_database::state::bal::Bal as RevmBal;
7
8/// Raw BAL RLP bytes sealed by the BAL hash.
9pub type SealedBal = Sealed<Bytes>;
10
11/// Notification emitted when a new BAL is inserted into the store.
12#[derive(Clone, Debug, PartialEq, Eq)]
13pub struct BalNotification {
14    /// Number and hash of the block the BAL belongs to.
15    pub num_hash: NumHash,
16    /// Raw BAL RLP payload sealed by the BAL hash.
17    pub bal: SealedBal,
18}
19
20impl BalNotification {
21    /// Creates a new [`BalNotification`].
22    pub const fn new(num_hash: NumHash, bal: SealedBal) -> Self {
23        Self { num_hash, bal }
24    }
25}
26
27#[cfg(feature = "std")]
28pub use self::subscriptions::BalNotificationStream;
29
30#[cfg(feature = "std")]
31mod subscriptions {
32    use super::BalNotification;
33
34    /// A stream of [`BalNotification`]s.
35    pub type BalNotificationStream = reth_tokio_util::EventStream<BalNotification>;
36}
37
38/// Store for Block Access Lists (BALs).
39///
40/// This abstraction intentionally does not prescribe where BALs live. Implementations may keep
41/// recent BALs in memory, read canonical BALs from static files, or compose multiple tiers behind
42/// a single interface.
43#[auto_impl::auto_impl(&, Arc, Box)]
44pub trait BalStore: Send + Sync + 'static {
45    /// Insert the BAL for the given block.
46    ///
47    /// Implementations may buffer inserts. Call [`Self::flush`] when pending BALs need to be made
48    /// durable.
49    fn insert(&self, num_hash: NumHash, bal: SealedBal) -> ProviderResult<()>;
50
51    /// Insert multiple BALs.
52    ///
53    /// The default implementation preserves the behavior of repeated [`Self::insert`] calls.
54    fn insert_many(&self, entries: Vec<(NumHash, SealedBal)>) -> ProviderResult<()> {
55        for (num_hash, bal) in entries {
56            self.insert(num_hash, bal)?;
57        }
58        Ok(())
59    }
60
61    /// Flushes any pending BALs to the backing store.
62    ///
63    /// In-memory implementations may treat this as a no-op.
64    fn flush(&self) -> ProviderResult<()> {
65        Ok(())
66    }
67
68    /// Prunes expired BALs according to the store's retention policy and the given chain tip.
69    ///
70    /// Returns the number of BALs pruned.
71    fn prune(&self, tip: BlockNumber) -> ProviderResult<usize>;
72
73    /// Fetch BALs for the given block hashes.
74    ///
75    /// The returned vector must align with `block_hashes`.
76    fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>>;
77
78    /// Fetches the BAL for the given block hash.
79    fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
80        Ok(self.get_by_hashes(&[block_hash])?.into_iter().next().flatten())
81    }
82
83    /// Fetches and decodes the BAL for the given block hash.
84    fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
85        self.get_by_hash(block_hash)?
86            .map(DecodedBal::from_rlp_bytes)
87            .transpose()
88            .map_err(Into::into)
89    }
90
91    /// Fetches the BAL for the given block hash in revm representation.
92    fn revm_bal_by_hash(
93        &self,
94        block_hash: BlockHash,
95    ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
96        self.get_decoded_by_hash(block_hash)?
97            .map(|decoded| {
98                decoded.try_map(|bal| {
99                    RevmBal::try_from(Vec::from(bal))
100                        .map_err(reth_storage_errors::provider::ProviderError::other)
101                })
102            })
103            .transpose()
104    }
105
106    /// Fetch BAL response entries for the given block hashes, stopping after the soft limit is
107    /// exceeded.
108    ///
109    /// Entries are returned in request order. Unavailable BALs are represented as `None`. The
110    /// limit is soft: the entry that exceeds the limit is included.
111    fn get_by_hashes_with_limit(
112        &self,
113        block_hashes: &[BlockHash],
114        limit: GetBlockAccessListLimit,
115    ) -> ProviderResult<Vec<Option<Bytes>>> {
116        let mut out = Vec::new();
117        self.append_by_hashes_with_limit(block_hashes, limit, &mut out)?;
118        out.shrink_to_fit();
119        Ok(out)
120    }
121
122    /// Extends the given vector with BAL response entries for the given hashes.
123    ///
124    /// This adheres to the expected behavior of [`Self::get_by_hashes_with_limit`].
125    fn append_by_hashes_with_limit(
126        &self,
127        block_hashes: &[BlockHash],
128        limit: GetBlockAccessListLimit,
129        out: &mut Vec<Option<Bytes>>,
130    ) -> ProviderResult<()> {
131        let mut size = 0;
132        for bal in self.get_by_hashes(block_hashes)? {
133            size += bal.as_ref().map_or(1, |bytes| bytes.len());
134            out.push(bal);
135
136            if limit.exceeds(size) {
137                break
138            }
139        }
140        Ok(())
141    }
142
143    /// Returns a stream of BAL insert notifications.
144    ///
145    /// Notifications are emitted only after a BAL has been successfully inserted into the store.
146    /// They do not imply canonicality.
147    #[cfg(feature = "std")]
148    fn bal_stream(&self) -> BalNotificationStream;
149}
150
151/// The limit to enforce for [`BalStore::get_by_hashes_with_limit`].
152#[derive(Debug, Clone, Copy, Eq, PartialEq)]
153pub enum GetBlockAccessListLimit {
154    /// No limit, return all BALs.
155    None,
156    /// Enforce a size limit on the returned BALs, for example 2MB.
157    ResponseSizeSoftLimit(usize),
158}
159
160impl GetBlockAccessListLimit {
161    /// Returns true if the given size exceeds the limit.
162    #[inline]
163    pub const fn exceeds(&self, size: usize) -> bool {
164        match self {
165            Self::None => false,
166            Self::ResponseSizeSoftLimit(limit) => size > *limit,
167        }
168    }
169}
170
171/// Clone-friendly façade around a BAL store implementation.
172#[derive(Clone)]
173pub struct BalStoreHandle {
174    inner: Arc<dyn BalStore>,
175}
176
177impl BalStoreHandle {
178    /// Creates a new [`BalStoreHandle`] from the given implementation.
179    pub fn new(inner: impl BalStore) -> Self {
180        Self { inner: Arc::new(inner) }
181    }
182
183    /// Creates a [`BalStoreHandle`] backed by [`NoopBalStore`].
184    pub fn noop() -> Self {
185        Self::new(NoopBalStore)
186    }
187
188    /// Insert the BAL for the given block.
189    #[inline]
190    pub fn insert(&self, num_hash: NumHash, bal: SealedBal) -> ProviderResult<()> {
191        self.inner.insert(num_hash, bal)
192    }
193
194    /// Insert multiple BALs.
195    #[inline]
196    pub fn insert_many(&self, entries: Vec<(NumHash, SealedBal)>) -> ProviderResult<()> {
197        self.inner.insert_many(entries)
198    }
199
200    /// Flushes any pending BALs to the backing store.
201    #[inline]
202    pub fn flush(&self) -> ProviderResult<()> {
203        self.inner.flush()
204    }
205
206    /// Prunes expired BALs according to the store's retention policy and the given chain tip.
207    #[inline]
208    pub fn prune(&self, tip: BlockNumber) -> ProviderResult<usize> {
209        self.inner.prune(tip)
210    }
211
212    /// Fetch BALs for the given block hashes.
213    #[inline]
214    pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
215        self.inner.get_by_hashes(block_hashes)
216    }
217
218    /// Fetches the BAL for the given block hash.
219    #[inline]
220    pub fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
221        self.inner.get_by_hash(block_hash)
222    }
223
224    /// Fetches and decodes the BAL for the given block hash.
225    #[inline]
226    pub fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
227        self.inner.get_decoded_by_hash(block_hash)
228    }
229
230    /// Fetches the BAL for the given block hash in revm representation.
231    #[inline]
232    pub fn revm_bal_by_hash(
233        &self,
234        block_hash: BlockHash,
235    ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
236        self.inner.revm_bal_by_hash(block_hash)
237    }
238
239    /// Fetch BAL response entries for the given block hashes, stopping after the soft limit is
240    /// exceeded.
241    #[inline]
242    pub fn get_by_hashes_with_limit(
243        &self,
244        block_hashes: &[BlockHash],
245        limit: GetBlockAccessListLimit,
246    ) -> ProviderResult<Vec<Option<Bytes>>> {
247        self.inner.get_by_hashes_with_limit(block_hashes, limit)
248    }
249
250    /// Extends the given vector with BAL response entries for the given hashes.
251    #[inline]
252    pub fn append_by_hashes_with_limit(
253        &self,
254        block_hashes: &[BlockHash],
255        limit: GetBlockAccessListLimit,
256        out: &mut Vec<Option<Bytes>>,
257    ) -> ProviderResult<()> {
258        self.inner.append_by_hashes_with_limit(block_hashes, limit, out)
259    }
260
261    /// Returns a stream of BAL insert notifications.
262    #[cfg(feature = "std")]
263    #[inline]
264    pub fn bal_stream(&self) -> BalNotificationStream {
265        self.inner.bal_stream()
266    }
267}
268
269impl Default for BalStoreHandle {
270    fn default() -> Self {
271        Self::noop()
272    }
273}
274
275impl core::fmt::Debug for BalStoreHandle {
276    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
277        f.debug_struct("BalStoreHandle").finish_non_exhaustive()
278    }
279}
280
281/// Provider-side access to BAL storage.
282#[auto_impl::auto_impl(&, Arc)]
283pub trait BalProvider {
284    /// Returns the configured BAL store handle.
285    fn bal_store(&self) -> &BalStoreHandle;
286}
287
288/// No-op BAL store used as the default wiring target until a concrete implementation is injected.
289#[derive(Debug, Default, Clone, Copy)]
290pub struct NoopBalStore;
291
292impl BalStore for NoopBalStore {
293    fn insert(&self, _num_hash: NumHash, _bal: SealedBal) -> ProviderResult<()> {
294        Ok(())
295    }
296
297    fn insert_many(&self, _entries: Vec<(NumHash, SealedBal)>) -> ProviderResult<()> {
298        Ok(())
299    }
300
301    fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
302        Ok(0)
303    }
304
305    fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
306        Ok(block_hashes.iter().map(|_| None).collect())
307    }
308
309    fn append_by_hashes_with_limit(
310        &self,
311        block_hashes: &[BlockHash],
312        limit: GetBlockAccessListLimit,
313        out: &mut Vec<Option<Bytes>>,
314    ) -> ProviderResult<()> {
315        let mut size = 0;
316        for _ in block_hashes {
317            size += 1;
318            out.push(None);
319
320            if limit.exceeds(size) {
321                break
322            }
323        }
324        Ok(())
325    }
326
327    #[cfg(feature = "std")]
328    fn bal_stream(&self) -> BalNotificationStream {
329        reth_tokio_util::EventSender::new(1).new_listener()
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use alloy_primitives::B256;
337    #[cfg(feature = "std")]
338    use tokio_stream::StreamExt;
339
340    const EMPTY_LIST_CODE: u8 = 0xc0;
341
342    #[test]
343    fn noop_store_returns_empty_results() {
344        let store = BalStoreHandle::default();
345        let hashes = [B256::random(), B256::random()];
346
347        let by_hash = store.get_by_hashes(&hashes).unwrap();
348
349        assert_eq!(by_hash, vec![None, None]);
350        assert!(store.get_by_hash(B256::random()).unwrap().is_none());
351        assert_eq!(store.prune(10).unwrap(), 0);
352    }
353
354    #[test]
355    fn noop_store_flush_is_noop() {
356        let store = BalStoreHandle::default();
357
358        store.flush().unwrap();
359    }
360
361    #[test]
362    fn noop_store_decoded_lookup_returns_none() {
363        let store = BalStoreHandle::default();
364
365        assert!(store.get_decoded_by_hash(B256::random()).unwrap().is_none());
366    }
367
368    #[test]
369    fn decoded_lookup_decodes_raw_bal() {
370        let hash = B256::random();
371        let raw_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
372        let store = BalStoreHandle::new(TestBalStore { hash, raw_bal: raw_bal.clone() });
373
374        assert_eq!(store.get_by_hash(hash).unwrap(), Some(raw_bal.clone()));
375
376        let decoded = store.get_decoded_by_hash(hash).unwrap().unwrap();
377
378        assert_eq!(decoded.as_raw(), &raw_bal);
379
380        let revm_bal = store.revm_bal_by_hash(hash).unwrap().unwrap();
381
382        assert_eq!(revm_bal.as_raw(), &raw_bal);
383        assert!(revm_bal.as_bal().accounts.is_empty());
384    }
385
386    #[test]
387    fn noop_store_limited_lookup_returns_prefix() {
388        let store = BalStoreHandle::default();
389        let hashes = [B256::random(), B256::random(), B256::random()];
390
391        let limited = store
392            .get_by_hashes_with_limit(&hashes, GetBlockAccessListLimit::ResponseSizeSoftLimit(1))
393            .unwrap();
394
395        assert_eq!(limited, vec![None, None]);
396    }
397
398    #[test]
399    fn block_access_list_limit() {
400        let limit_none = GetBlockAccessListLimit::None;
401        assert!(!limit_none.exceeds(usize::MAX));
402
403        let size_limit_2mb = GetBlockAccessListLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
404        assert!(!size_limit_2mb.exceeds(1024 * 1024));
405        assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
406        assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
407    }
408
409    #[cfg(feature = "std")]
410    #[tokio::test]
411    async fn noop_store_stream_is_empty() {
412        let store = BalStoreHandle::default();
413        let mut stream = store.bal_stream();
414
415        assert!(stream.next().await.is_none());
416    }
417
418    #[derive(Debug)]
419    struct TestBalStore {
420        hash: B256,
421        raw_bal: Bytes,
422    }
423
424    impl BalStore for TestBalStore {
425        fn insert(&self, _num_hash: NumHash, _bal: SealedBal) -> ProviderResult<()> {
426            Ok(())
427        }
428
429        fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
430            Ok(0)
431        }
432
433        fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
434            Ok(block_hashes
435                .iter()
436                .map(|hash| (*hash == self.hash).then(|| self.raw_bal.clone()))
437                .collect())
438        }
439
440        #[cfg(feature = "std")]
441        fn bal_stream(&self) -> BalNotificationStream {
442            reth_tokio_util::EventSender::new(1).new_listener()
443        }
444    }
445}