reth_transaction_pool/blobstore/
mem.rs1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3 eip4844::{BlobAndProofV1, BlobAndProofV2},
4 eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::{map::B256Map, B256};
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13 inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17 fn get_by_versioned_hashes_eip7594(
23 &self,
24 versioned_hashes: &[B256],
25 ) -> Vec<Option<BlobAndProofV2>> {
26 let mut result = vec![None; versioned_hashes.len()];
27 let mut missing_count = result.len();
28 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
29 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30 for (hash_idx, match_result) in
31 blob_sidecar.match_versioned_hashes(versioned_hashes)
32 {
33 let slot = &mut result[hash_idx];
34 if slot.is_none() {
35 missing_count -= 1;
36 }
37 *slot = Some(match_result);
38 }
39 }
40
41 if missing_count == 0 {
43 if result.iter().all(|blob| blob.is_some()) {
45 break;
46 }
47 }
48 }
49 result
50 }
51}
52
53#[derive(Debug, Default)]
54struct InMemoryBlobStoreInner {
55 store: RwLock<B256Map<Arc<BlobTransactionSidecarVariant>>>,
57 size_tracker: BlobStoreSize,
58}
59
60impl PartialEq for InMemoryBlobStoreInner {
61 fn eq(&self, other: &Self) -> bool {
62 self.store.read().eq(&*other.store.read())
63 }
64}
65
66impl BlobStore for InMemoryBlobStore {
67 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
68 let mut store = self.inner.store.write();
69 self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
70 self.inner.size_tracker.update_len(store.len());
71 Ok(())
72 }
73
74 fn insert_all(
75 &self,
76 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
77 ) -> Result<(), BlobStoreError> {
78 if txs.is_empty() {
79 return Ok(())
80 }
81 let mut store = self.inner.store.write();
82 let mut total_add = 0;
83 for (tx, data) in txs {
84 let add = insert_size(&mut store, tx, data);
85 total_add += add;
86 }
87 self.inner.size_tracker.add_size(total_add);
88 self.inner.size_tracker.update_len(store.len());
89 Ok(())
90 }
91
92 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
93 let mut store = self.inner.store.write();
94 let sub = remove_size(&mut store, &tx);
95 self.inner.size_tracker.sub_size(sub);
96 self.inner.size_tracker.update_len(store.len());
97 Ok(())
98 }
99
100 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
101 if txs.is_empty() {
102 return Ok(())
103 }
104 let mut store = self.inner.store.write();
105 let mut total_sub = 0;
106 for tx in txs {
107 total_sub += remove_size(&mut store, &tx);
108 }
109 self.inner.size_tracker.sub_size(total_sub);
110 self.inner.size_tracker.update_len(store.len());
111 Ok(())
112 }
113
114 fn cleanup(&self) -> BlobStoreCleanupStat {
115 BlobStoreCleanupStat::default()
116 }
117
118 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
120 Ok(self.inner.store.read().get(&tx).cloned())
121 }
122
123 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
124 Ok(self.inner.store.read().contains_key(&tx))
125 }
126
127 fn get_all(
128 &self,
129 txs: Vec<B256>,
130 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
131 let store = self.inner.store.read();
132 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
133 }
134
135 fn get_exact(
136 &self,
137 txs: Vec<B256>,
138 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
139 if txs.is_empty() {
140 return Ok(Vec::new());
141 }
142 let store = self.inner.store.read();
143 txs.into_iter()
144 .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
145 .collect()
146 }
147
148 fn get_by_versioned_hashes_v1(
149 &self,
150 versioned_hashes: &[B256],
151 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
152 let mut result = vec![None; versioned_hashes.len()];
153 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
154 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
155 for (hash_idx, match_result) in
156 blob_sidecar.match_versioned_hashes(versioned_hashes)
157 {
158 result[hash_idx] = Some(match_result);
159 }
160 }
161
162 if result.iter().all(|blob| blob.is_some()) {
164 break;
165 }
166 }
167 Ok(result)
168 }
169
170 fn get_by_versioned_hashes_v2(
171 &self,
172 versioned_hashes: &[B256],
173 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
174 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
175 if result.iter().all(|blob| blob.is_some()) {
176 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
177 } else {
178 Ok(None)
179 }
180 }
181
182 fn get_by_versioned_hashes_v3(
183 &self,
184 versioned_hashes: &[B256],
185 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
186 Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
187 }
188
189 fn data_size_hint(&self) -> Option<usize> {
190 Some(self.inner.size_tracker.data_size())
191 }
192
193 fn blobs_len(&self) -> usize {
194 self.inner.size_tracker.blobs_len()
195 }
196}
197
198#[inline]
200fn remove_size(store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
201 store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
202}
203
204#[inline]
208fn insert_size(
209 store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>,
210 tx: B256,
211 blob: BlobTransactionSidecarVariant,
212) -> usize {
213 let add = blob.size();
214 store.insert(tx, Arc::new(blob));
215 add
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use alloy_eips::{
222 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
223 eip7594::{
224 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
225 },
226 };
227
228 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
229 let blob = Blob::default();
230 let commitment = Bytes48::default();
231 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
232
233 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
234
235 let expected =
236 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
237 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
238
239 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
240 }
241
242 #[test]
243 fn mem_get_blobs_v3_returns_partial_results() {
244 let store = InMemoryBlobStore::default();
245
246 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
247 store.insert(B256::random(), sidecar).unwrap();
248
249 assert_ne!(versioned_hash, B256::ZERO);
250
251 let request = vec![versioned_hash, B256::ZERO];
252 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
253 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
254
255 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
256 assert_eq!(v3, vec![Some(expected), None]);
257 }
258}