1use alloc::{sync::Arc, vec::Vec};
2use alloy_eip7928::bal::DecodedBal;
3use alloy_eips::NumHash;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealed};
5use reth_storage_errors::provider::ProviderResult;
6use revm_database::state::bal::Bal as RevmBal;
7
8pub type SealedBal = Sealed<Bytes>;
10
11#[derive(Clone, Debug, PartialEq, Eq)]
13pub struct BalNotification {
14 pub num_hash: NumHash,
16 pub bal: SealedBal,
18}
19
20impl BalNotification {
21 pub const fn new(num_hash: NumHash, bal: SealedBal) -> Self {
23 Self { num_hash, bal }
24 }
25}
26
27#[cfg(feature = "std")]
28pub use self::subscriptions::BalNotificationStream;
29
30#[cfg(feature = "std")]
31mod subscriptions {
32 use super::BalNotification;
33
34 pub type BalNotificationStream = reth_tokio_util::EventStream<BalNotification>;
36}
37
38#[auto_impl::auto_impl(&, Arc, Box)]
44pub trait BalStore: Send + Sync + 'static {
45 fn insert(&self, num_hash: NumHash, bal: SealedBal) -> ProviderResult<()>;
50
51 fn insert_many(&self, entries: Vec<(NumHash, SealedBal)>) -> ProviderResult<()> {
55 for (num_hash, bal) in entries {
56 self.insert(num_hash, bal)?;
57 }
58 Ok(())
59 }
60
61 fn flush(&self) -> ProviderResult<()> {
65 Ok(())
66 }
67
68 fn prune(&self, tip: BlockNumber) -> ProviderResult<usize>;
72
73 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>>;
77
78 fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
80 Ok(self.get_by_hashes(&[block_hash])?.into_iter().next().flatten())
81 }
82
83 fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
85 self.get_by_hash(block_hash)?
86 .map(DecodedBal::from_rlp_bytes)
87 .transpose()
88 .map_err(Into::into)
89 }
90
91 fn revm_bal_by_hash(
93 &self,
94 block_hash: BlockHash,
95 ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
96 self.get_decoded_by_hash(block_hash)?
97 .map(|decoded| {
98 decoded.try_map(|bal| {
99 RevmBal::try_from(Vec::from(bal))
100 .map_err(reth_storage_errors::provider::ProviderError::other)
101 })
102 })
103 .transpose()
104 }
105
106 fn get_by_hashes_with_limit(
112 &self,
113 block_hashes: &[BlockHash],
114 limit: GetBlockAccessListLimit,
115 ) -> ProviderResult<Vec<Option<Bytes>>> {
116 let mut out = Vec::new();
117 self.append_by_hashes_with_limit(block_hashes, limit, &mut out)?;
118 out.shrink_to_fit();
119 Ok(out)
120 }
121
122 fn append_by_hashes_with_limit(
126 &self,
127 block_hashes: &[BlockHash],
128 limit: GetBlockAccessListLimit,
129 out: &mut Vec<Option<Bytes>>,
130 ) -> ProviderResult<()> {
131 let mut size = 0;
132 for bal in self.get_by_hashes(block_hashes)? {
133 size += bal.as_ref().map_or(1, |bytes| bytes.len());
134 out.push(bal);
135
136 if limit.exceeds(size) {
137 break
138 }
139 }
140 Ok(())
141 }
142
143 #[cfg(feature = "std")]
148 fn bal_stream(&self) -> BalNotificationStream;
149}
150
151#[derive(Debug, Clone, Copy, Eq, PartialEq)]
153pub enum GetBlockAccessListLimit {
154 None,
156 ResponseSizeSoftLimit(usize),
158}
159
160impl GetBlockAccessListLimit {
161 #[inline]
163 pub const fn exceeds(&self, size: usize) -> bool {
164 match self {
165 Self::None => false,
166 Self::ResponseSizeSoftLimit(limit) => size > *limit,
167 }
168 }
169}
170
171#[derive(Clone)]
173pub struct BalStoreHandle {
174 inner: Arc<dyn BalStore>,
175}
176
177impl BalStoreHandle {
178 pub fn new(inner: impl BalStore) -> Self {
180 Self { inner: Arc::new(inner) }
181 }
182
183 pub fn noop() -> Self {
185 Self::new(NoopBalStore)
186 }
187
188 #[inline]
190 pub fn insert(&self, num_hash: NumHash, bal: SealedBal) -> ProviderResult<()> {
191 self.inner.insert(num_hash, bal)
192 }
193
194 #[inline]
196 pub fn insert_many(&self, entries: Vec<(NumHash, SealedBal)>) -> ProviderResult<()> {
197 self.inner.insert_many(entries)
198 }
199
200 #[inline]
202 pub fn flush(&self) -> ProviderResult<()> {
203 self.inner.flush()
204 }
205
206 #[inline]
208 pub fn prune(&self, tip: BlockNumber) -> ProviderResult<usize> {
209 self.inner.prune(tip)
210 }
211
212 #[inline]
214 pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
215 self.inner.get_by_hashes(block_hashes)
216 }
217
218 #[inline]
220 pub fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
221 self.inner.get_by_hash(block_hash)
222 }
223
224 #[inline]
226 pub fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
227 self.inner.get_decoded_by_hash(block_hash)
228 }
229
230 #[inline]
232 pub fn revm_bal_by_hash(
233 &self,
234 block_hash: BlockHash,
235 ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
236 self.inner.revm_bal_by_hash(block_hash)
237 }
238
239 #[inline]
242 pub fn get_by_hashes_with_limit(
243 &self,
244 block_hashes: &[BlockHash],
245 limit: GetBlockAccessListLimit,
246 ) -> ProviderResult<Vec<Option<Bytes>>> {
247 self.inner.get_by_hashes_with_limit(block_hashes, limit)
248 }
249
250 #[inline]
252 pub fn append_by_hashes_with_limit(
253 &self,
254 block_hashes: &[BlockHash],
255 limit: GetBlockAccessListLimit,
256 out: &mut Vec<Option<Bytes>>,
257 ) -> ProviderResult<()> {
258 self.inner.append_by_hashes_with_limit(block_hashes, limit, out)
259 }
260
261 #[cfg(feature = "std")]
263 #[inline]
264 pub fn bal_stream(&self) -> BalNotificationStream {
265 self.inner.bal_stream()
266 }
267}
268
269impl Default for BalStoreHandle {
270 fn default() -> Self {
271 Self::noop()
272 }
273}
274
275impl core::fmt::Debug for BalStoreHandle {
276 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
277 f.debug_struct("BalStoreHandle").finish_non_exhaustive()
278 }
279}
280
281#[auto_impl::auto_impl(&, Arc)]
283pub trait BalProvider {
284 fn bal_store(&self) -> &BalStoreHandle;
286}
287
288#[derive(Debug, Default, Clone, Copy)]
290pub struct NoopBalStore;
291
292impl BalStore for NoopBalStore {
293 fn insert(&self, _num_hash: NumHash, _bal: SealedBal) -> ProviderResult<()> {
294 Ok(())
295 }
296
297 fn insert_many(&self, _entries: Vec<(NumHash, SealedBal)>) -> ProviderResult<()> {
298 Ok(())
299 }
300
301 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
302 Ok(0)
303 }
304
305 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
306 Ok(block_hashes.iter().map(|_| None).collect())
307 }
308
309 fn append_by_hashes_with_limit(
310 &self,
311 block_hashes: &[BlockHash],
312 limit: GetBlockAccessListLimit,
313 out: &mut Vec<Option<Bytes>>,
314 ) -> ProviderResult<()> {
315 let mut size = 0;
316 for _ in block_hashes {
317 size += 1;
318 out.push(None);
319
320 if limit.exceeds(size) {
321 break
322 }
323 }
324 Ok(())
325 }
326
327 #[cfg(feature = "std")]
328 fn bal_stream(&self) -> BalNotificationStream {
329 reth_tokio_util::EventSender::new(1).new_listener()
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use alloy_primitives::B256;
337 #[cfg(feature = "std")]
338 use tokio_stream::StreamExt;
339
340 const EMPTY_LIST_CODE: u8 = 0xc0;
341
342 #[test]
343 fn noop_store_returns_empty_results() {
344 let store = BalStoreHandle::default();
345 let hashes = [B256::random(), B256::random()];
346
347 let by_hash = store.get_by_hashes(&hashes).unwrap();
348
349 assert_eq!(by_hash, vec![None, None]);
350 assert!(store.get_by_hash(B256::random()).unwrap().is_none());
351 assert_eq!(store.prune(10).unwrap(), 0);
352 }
353
354 #[test]
355 fn noop_store_flush_is_noop() {
356 let store = BalStoreHandle::default();
357
358 store.flush().unwrap();
359 }
360
361 #[test]
362 fn noop_store_decoded_lookup_returns_none() {
363 let store = BalStoreHandle::default();
364
365 assert!(store.get_decoded_by_hash(B256::random()).unwrap().is_none());
366 }
367
368 #[test]
369 fn decoded_lookup_decodes_raw_bal() {
370 let hash = B256::random();
371 let raw_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
372 let store = BalStoreHandle::new(TestBalStore { hash, raw_bal: raw_bal.clone() });
373
374 assert_eq!(store.get_by_hash(hash).unwrap(), Some(raw_bal.clone()));
375
376 let decoded = store.get_decoded_by_hash(hash).unwrap().unwrap();
377
378 assert_eq!(decoded.as_raw(), &raw_bal);
379
380 let revm_bal = store.revm_bal_by_hash(hash).unwrap().unwrap();
381
382 assert_eq!(revm_bal.as_raw(), &raw_bal);
383 assert!(revm_bal.as_bal().accounts.is_empty());
384 }
385
386 #[test]
387 fn noop_store_limited_lookup_returns_prefix() {
388 let store = BalStoreHandle::default();
389 let hashes = [B256::random(), B256::random(), B256::random()];
390
391 let limited = store
392 .get_by_hashes_with_limit(&hashes, GetBlockAccessListLimit::ResponseSizeSoftLimit(1))
393 .unwrap();
394
395 assert_eq!(limited, vec![None, None]);
396 }
397
398 #[test]
399 fn block_access_list_limit() {
400 let limit_none = GetBlockAccessListLimit::None;
401 assert!(!limit_none.exceeds(usize::MAX));
402
403 let size_limit_2mb = GetBlockAccessListLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
404 assert!(!size_limit_2mb.exceeds(1024 * 1024));
405 assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
406 assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
407 }
408
409 #[cfg(feature = "std")]
410 #[tokio::test]
411 async fn noop_store_stream_is_empty() {
412 let store = BalStoreHandle::default();
413 let mut stream = store.bal_stream();
414
415 assert!(stream.next().await.is_none());
416 }
417
418 #[derive(Debug)]
419 struct TestBalStore {
420 hash: B256,
421 raw_bal: Bytes,
422 }
423
424 impl BalStore for TestBalStore {
425 fn insert(&self, _num_hash: NumHash, _bal: SealedBal) -> ProviderResult<()> {
426 Ok(())
427 }
428
429 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
430 Ok(0)
431 }
432
433 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
434 Ok(block_hashes
435 .iter()
436 .map(|hash| (*hash == self.hash).then(|| self.raw_bal.clone()))
437 .collect())
438 }
439
440 #[cfg(feature = "std")]
441 fn bal_stream(&self) -> BalNotificationStream {
442 reth_tokio_util::EventSender::new(1).new_listener()
443 }
444 }
445}