reth_transaction_pool/blobstore/
mem.rs
1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
3use alloy_primitives::B256;
4use parking_lot::RwLock;
5use std::{collections::HashMap, sync::Arc};
6
7#[derive(Clone, Debug, Default, PartialEq)]
9pub struct InMemoryBlobStore {
10 inner: Arc<InMemoryBlobStoreInner>,
11}
12
13#[derive(Debug, Default)]
14struct InMemoryBlobStoreInner {
15 store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
17 size_tracker: BlobStoreSize,
18}
19
20impl PartialEq for InMemoryBlobStoreInner {
21 fn eq(&self, other: &Self) -> bool {
22 self.store.read().eq(&other.store.read())
23 }
24}
25
26impl BlobStore for InMemoryBlobStore {
27 fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
28 let mut store = self.inner.store.write();
29 self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
30 self.inner.size_tracker.update_len(store.len());
31 Ok(())
32 }
33
34 fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
35 if txs.is_empty() {
36 return Ok(())
37 }
38 let mut store = self.inner.store.write();
39 let mut total_add = 0;
40 for (tx, data) in txs {
41 let add = insert_size(&mut store, tx, data);
42 total_add += add;
43 }
44 self.inner.size_tracker.add_size(total_add);
45 self.inner.size_tracker.update_len(store.len());
46 Ok(())
47 }
48
49 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
50 let mut store = self.inner.store.write();
51 let sub = remove_size(&mut store, &tx);
52 self.inner.size_tracker.sub_size(sub);
53 self.inner.size_tracker.update_len(store.len());
54 Ok(())
55 }
56
57 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
58 if txs.is_empty() {
59 return Ok(())
60 }
61 let mut store = self.inner.store.write();
62 let mut total_sub = 0;
63 for tx in txs {
64 total_sub += remove_size(&mut store, &tx);
65 }
66 self.inner.size_tracker.sub_size(total_sub);
67 self.inner.size_tracker.update_len(store.len());
68 Ok(())
69 }
70
71 fn cleanup(&self) -> BlobStoreCleanupStat {
72 BlobStoreCleanupStat::default()
73 }
74
75 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
77 Ok(self.inner.store.read().get(&tx).cloned())
78 }
79
80 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
81 Ok(self.inner.store.read().contains_key(&tx))
82 }
83
84 fn get_all(
85 &self,
86 txs: Vec<B256>,
87 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
88 let store = self.inner.store.read();
89 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
90 }
91
92 fn get_exact(
93 &self,
94 txs: Vec<B256>,
95 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
96 let store = self.inner.store.read();
97 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
98 }
99
100 fn get_by_versioned_hashes(
101 &self,
102 versioned_hashes: &[B256],
103 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
104 let mut result = vec![None; versioned_hashes.len()];
105 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
106 for (hash_idx, match_result) in blob_sidecar.match_versioned_hashes(versioned_hashes) {
107 result[hash_idx] = Some(match_result);
108 }
109
110 if result.iter().all(|blob| blob.is_some()) {
112 break;
113 }
114 }
115 Ok(result)
116 }
117
118 fn data_size_hint(&self) -> Option<usize> {
119 Some(self.inner.size_tracker.data_size())
120 }
121
122 fn blobs_len(&self) -> usize {
123 self.inner.size_tracker.blobs_len()
124 }
125}
126
127#[inline]
129fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
130 store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
131}
132
133#[inline]
137fn insert_size(
138 store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
139 tx: B256,
140 blob: BlobTransactionSidecar,
141) -> usize {
142 let add = blob.size();
143 store.insert(tx, Arc::new(blob));
144 add
145}