1use alloc::{sync::Arc, vec::Vec};
2use alloy_eip7928::bal::DecodedBal;
3pub use alloy_eip7928::bal::RawBal;
4use alloy_eips::NumHash;
5use alloy_primitives::{BlockHash, BlockNumber, Bytes};
6use reth_storage_errors::provider::ProviderResult;
7use revm_database::state::bal::Bal as RevmBal;
8
9#[derive(Clone, Debug, PartialEq, Eq)]
11pub struct BalNotification {
12 pub num_hash: NumHash,
14 pub bal: RawBal,
16}
17
18impl BalNotification {
19 pub const fn new(num_hash: NumHash, bal: RawBal) -> Self {
21 Self { num_hash, bal }
22 }
23}
24
25#[cfg(feature = "std")]
26pub use self::subscriptions::BalNotificationStream;
27
28#[cfg(feature = "std")]
29mod subscriptions {
30 use super::BalNotification;
31
32 pub type BalNotificationStream = reth_tokio_util::EventStream<BalNotification>;
34}
35
36#[auto_impl::auto_impl(&, Arc, Box)]
42pub trait BalStore: Send + Sync + 'static {
43 fn insert(&self, num_hash: NumHash, bal: RawBal) -> ProviderResult<()>;
48
49 fn insert_many(&self, entries: Vec<(NumHash, RawBal)>) -> ProviderResult<()> {
53 for (num_hash, bal) in entries {
54 self.insert(num_hash, bal)?;
55 }
56 Ok(())
57 }
58
59 fn flush(&self) -> ProviderResult<()> {
63 Ok(())
64 }
65
66 fn prune(&self, tip: BlockNumber) -> ProviderResult<usize>;
70
71 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>>;
75
76 fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
78 Ok(self.get_by_hashes(&[block_hash])?.into_iter().next().flatten())
79 }
80
81 fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
83 self.get_by_hash(block_hash)?
84 .map(DecodedBal::from_rlp_bytes)
85 .transpose()
86 .map_err(Into::into)
87 }
88
89 fn revm_bal_by_hash(
91 &self,
92 block_hash: BlockHash,
93 ) -> ProviderResult<Option<DecodedBal<Arc<RevmBal>>>> {
94 self.get_decoded_by_hash(block_hash)?
95 .map(|decoded| {
96 decoded.try_map(|bal| {
97 RevmBal::try_from(Vec::from(bal))
98 .map(Arc::new)
99 .map_err(reth_storage_errors::provider::ProviderError::other)
100 })
101 })
102 .transpose()
103 }
104
105 fn get_by_hashes_with_limit(
111 &self,
112 block_hashes: &[BlockHash],
113 limit: GetBlockAccessListLimit,
114 ) -> ProviderResult<Vec<Option<Bytes>>> {
115 let mut out = Vec::new();
116 self.append_by_hashes_with_limit(block_hashes, limit, &mut out)?;
117 out.shrink_to_fit();
118 Ok(out)
119 }
120
121 fn append_by_hashes_with_limit(
125 &self,
126 block_hashes: &[BlockHash],
127 limit: GetBlockAccessListLimit,
128 out: &mut Vec<Option<Bytes>>,
129 ) -> ProviderResult<()> {
130 let mut size = 0;
131 for bal in self.get_by_hashes(block_hashes)? {
132 size += bal.as_ref().map_or(1, |bytes| bytes.len());
133 out.push(bal);
134
135 if limit.exceeds(size) {
136 break
137 }
138 }
139 Ok(())
140 }
141
142 #[cfg(feature = "std")]
147 fn bal_stream(&self) -> BalNotificationStream;
148}
149
150#[derive(Debug, Clone, Copy, Eq, PartialEq)]
152pub enum GetBlockAccessListLimit {
153 None,
155 ResponseSizeSoftLimit(usize),
157}
158
159impl GetBlockAccessListLimit {
160 #[inline]
162 pub const fn exceeds(&self, size: usize) -> bool {
163 match self {
164 Self::None => false,
165 Self::ResponseSizeSoftLimit(limit) => size > *limit,
166 }
167 }
168}
169
170#[derive(Clone)]
172pub struct BalStoreHandle {
173 inner: Arc<dyn BalStore>,
174}
175
176impl BalStoreHandle {
177 pub fn new(inner: impl BalStore) -> Self {
179 Self { inner: Arc::new(inner) }
180 }
181
182 pub fn noop() -> Self {
184 Self::new(NoopBalStore)
185 }
186
187 #[inline]
189 pub fn insert(&self, num_hash: NumHash, bal: RawBal) -> ProviderResult<()> {
190 self.inner.insert(num_hash, bal)
191 }
192
193 #[inline]
195 pub fn insert_many(&self, entries: Vec<(NumHash, RawBal)>) -> ProviderResult<()> {
196 self.inner.insert_many(entries)
197 }
198
199 #[inline]
201 pub fn flush(&self) -> ProviderResult<()> {
202 self.inner.flush()
203 }
204
205 #[inline]
207 pub fn prune(&self, tip: BlockNumber) -> ProviderResult<usize> {
208 self.inner.prune(tip)
209 }
210
211 #[inline]
213 pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
214 self.inner.get_by_hashes(block_hashes)
215 }
216
217 #[inline]
219 pub fn get_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<Bytes>> {
220 self.inner.get_by_hash(block_hash)
221 }
222
223 #[inline]
225 pub fn get_decoded_by_hash(&self, block_hash: BlockHash) -> ProviderResult<Option<DecodedBal>> {
226 self.inner.get_decoded_by_hash(block_hash)
227 }
228
229 #[inline]
231 pub fn revm_bal_by_hash(
232 &self,
233 block_hash: BlockHash,
234 ) -> ProviderResult<Option<DecodedBal<Arc<RevmBal>>>> {
235 self.inner.revm_bal_by_hash(block_hash)
236 }
237
238 #[inline]
241 pub fn get_by_hashes_with_limit(
242 &self,
243 block_hashes: &[BlockHash],
244 limit: GetBlockAccessListLimit,
245 ) -> ProviderResult<Vec<Option<Bytes>>> {
246 self.inner.get_by_hashes_with_limit(block_hashes, limit)
247 }
248
249 #[inline]
251 pub fn append_by_hashes_with_limit(
252 &self,
253 block_hashes: &[BlockHash],
254 limit: GetBlockAccessListLimit,
255 out: &mut Vec<Option<Bytes>>,
256 ) -> ProviderResult<()> {
257 self.inner.append_by_hashes_with_limit(block_hashes, limit, out)
258 }
259
260 #[cfg(feature = "std")]
262 #[inline]
263 pub fn bal_stream(&self) -> BalNotificationStream {
264 self.inner.bal_stream()
265 }
266}
267
268impl Default for BalStoreHandle {
269 fn default() -> Self {
270 Self::noop()
271 }
272}
273
274impl core::fmt::Debug for BalStoreHandle {
275 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
276 f.debug_struct("BalStoreHandle").finish_non_exhaustive()
277 }
278}
279
280#[auto_impl::auto_impl(&, Arc)]
282pub trait BalProvider {
283 fn bal_store(&self) -> &BalStoreHandle;
285}
286
287#[derive(Debug, Default, Clone, Copy)]
289pub struct NoopBalStore;
290
291impl BalStore for NoopBalStore {
292 fn insert(&self, _num_hash: NumHash, _bal: RawBal) -> ProviderResult<()> {
293 Ok(())
294 }
295
296 fn insert_many(&self, _entries: Vec<(NumHash, RawBal)>) -> ProviderResult<()> {
297 Ok(())
298 }
299
300 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
301 Ok(0)
302 }
303
304 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
305 Ok(block_hashes.iter().map(|_| None).collect())
306 }
307
308 fn append_by_hashes_with_limit(
309 &self,
310 block_hashes: &[BlockHash],
311 limit: GetBlockAccessListLimit,
312 out: &mut Vec<Option<Bytes>>,
313 ) -> ProviderResult<()> {
314 let mut size = 0;
315 for _ in block_hashes {
316 size += 1;
317 out.push(None);
318
319 if limit.exceeds(size) {
320 break
321 }
322 }
323 Ok(())
324 }
325
326 #[cfg(feature = "std")]
327 fn bal_stream(&self) -> BalNotificationStream {
328 reth_tokio_util::EventSender::new(1).new_listener()
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use alloy_primitives::B256;
336 #[cfg(feature = "std")]
337 use tokio_stream::StreamExt;
338
339 const EMPTY_LIST_CODE: u8 = 0xc0;
340
341 #[test]
342 fn noop_store_returns_empty_results() {
343 let store = BalStoreHandle::default();
344 let hashes = [B256::random(), B256::random()];
345
346 let by_hash = store.get_by_hashes(&hashes).unwrap();
347
348 assert_eq!(by_hash, vec![None, None]);
349 assert!(store.get_by_hash(B256::random()).unwrap().is_none());
350 assert_eq!(store.prune(10).unwrap(), 0);
351 }
352
353 #[test]
354 fn noop_store_flush_is_noop() {
355 let store = BalStoreHandle::default();
356
357 store.flush().unwrap();
358 }
359
360 #[test]
361 fn noop_store_decoded_lookup_returns_none() {
362 let store = BalStoreHandle::default();
363
364 assert!(store.get_decoded_by_hash(B256::random()).unwrap().is_none());
365 }
366
367 #[test]
368 fn decoded_lookup_decodes_raw_bal() {
369 let hash = B256::random();
370 let raw_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
371 let store = BalStoreHandle::new(TestBalStore { hash, raw_bal: raw_bal.clone() });
372
373 assert_eq!(store.get_by_hash(hash).unwrap(), Some(raw_bal.clone()));
374
375 let decoded = store.get_decoded_by_hash(hash).unwrap().unwrap();
376
377 assert_eq!(decoded.as_raw(), &raw_bal);
378
379 let revm_bal = store.revm_bal_by_hash(hash).unwrap().unwrap();
380
381 assert_eq!(revm_bal.as_raw(), &raw_bal);
382 assert!(revm_bal.as_bal().accounts.is_empty());
383 }
384
385 #[test]
386 fn noop_store_limited_lookup_returns_prefix() {
387 let store = BalStoreHandle::default();
388 let hashes = [B256::random(), B256::random(), B256::random()];
389
390 let limited = store
391 .get_by_hashes_with_limit(&hashes, GetBlockAccessListLimit::ResponseSizeSoftLimit(1))
392 .unwrap();
393
394 assert_eq!(limited, vec![None, None]);
395 }
396
397 #[test]
398 fn block_access_list_limit() {
399 let limit_none = GetBlockAccessListLimit::None;
400 assert!(!limit_none.exceeds(usize::MAX));
401
402 let size_limit_2mb = GetBlockAccessListLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
403 assert!(!size_limit_2mb.exceeds(1024 * 1024));
404 assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
405 assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
406 }
407
408 #[cfg(feature = "std")]
409 #[tokio::test]
410 async fn noop_store_stream_is_empty() {
411 let store = BalStoreHandle::default();
412 let mut stream = store.bal_stream();
413
414 assert!(stream.next().await.is_none());
415 }
416
417 #[derive(Debug)]
418 struct TestBalStore {
419 hash: B256,
420 raw_bal: Bytes,
421 }
422
423 impl BalStore for TestBalStore {
424 fn insert(&self, _num_hash: NumHash, _bal: RawBal) -> ProviderResult<()> {
425 Ok(())
426 }
427
428 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
429 Ok(0)
430 }
431
432 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
433 Ok(block_hashes
434 .iter()
435 .map(|hash| (*hash == self.hash).then(|| self.raw_bal.clone()))
436 .collect())
437 }
438
439 #[cfg(feature = "std")]
440 fn bal_stream(&self) -> BalNotificationStream {
441 reth_tokio_util::EventSender::new(1).new_listener()
442 }
443 }
444}