reth_transaction_pool/blobstore/
mem.rs1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
4 eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
5};
6use alloy_primitives::{map::B256Map, B128, B256};
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13 inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17 fn get_by_versioned_hashes_eip7594(
23 &self,
24 versioned_hashes: &[B256],
25 ) -> Vec<Option<BlobAndProofV2>> {
26 let mut result = vec![None; versioned_hashes.len()];
27 let mut missing_count = result.len();
28 for blob_sidecar in self.inner.store.read().values() {
29 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30 for (hash_idx, match_result) in
31 blob_sidecar.match_versioned_hashes(versioned_hashes)
32 {
33 let slot = &mut result[hash_idx];
34 if slot.is_none() {
35 missing_count -= 1;
36 }
37 *slot = Some(match_result);
38 }
39 }
40
41 if missing_count == 0 {
43 if result.iter().all(|blob| blob.is_some()) {
45 break;
46 }
47 }
48 }
49 result
50 }
51
52 fn get_by_versioned_hashes_cells_eip7594(
54 &self,
55 versioned_hashes: &[B256],
56 indices_bitarray: B128,
57 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
58 let cell_mask = BlobCellMask::new(indices_bitarray);
59 let mut result = vec![None; versioned_hashes.len()];
60 let mut missing_count = result.len();
61 let blob_sidecars = self.inner.store.read().values().cloned().collect::<Vec<_>>();
62 for blob_sidecar in blob_sidecars {
63 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
64 for (hash_idx, match_result) in blob_sidecar
65 .match_versioned_hashes_cells(versioned_hashes, cell_mask)
66 .map_err(|err| BlobStoreError::Other(Box::new(err)))?
67 {
68 let slot = &mut result[hash_idx];
69 if slot.is_none() {
70 missing_count -= 1;
71 }
72 *slot = Some(match_result);
73 }
74 }
75
76 if missing_count == 0 && result.iter().all(Option::is_some) {
77 break;
78 }
79 }
80 Ok(result)
81 }
82}
83
84#[derive(Debug, Default)]
85struct InMemoryBlobStoreInner {
86 store: RwLock<B256Map<Arc<BlobTransactionSidecarVariant>>>,
88 size_tracker: BlobStoreSize,
89}
90
91impl PartialEq for InMemoryBlobStoreInner {
92 fn eq(&self, other: &Self) -> bool {
93 self.store.read().eq(&*other.store.read())
94 }
95}
96
97impl BlobStore for InMemoryBlobStore {
98 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
99 let mut store = self.inner.store.write();
100 self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
101 self.inner.size_tracker.update_len(store.len());
102 Ok(())
103 }
104
105 fn insert_all(
106 &self,
107 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
108 ) -> Result<(), BlobStoreError> {
109 if txs.is_empty() {
110 return Ok(())
111 }
112 let mut store = self.inner.store.write();
113 let mut total_add = 0;
114 for (tx, data) in txs {
115 let add = insert_size(&mut store, tx, data);
116 total_add += add;
117 }
118 self.inner.size_tracker.add_size(total_add);
119 self.inner.size_tracker.update_len(store.len());
120 Ok(())
121 }
122
123 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
124 let mut store = self.inner.store.write();
125 let sub = remove_size(&mut store, &tx);
126 self.inner.size_tracker.sub_size(sub);
127 self.inner.size_tracker.update_len(store.len());
128 Ok(())
129 }
130
131 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
132 if txs.is_empty() {
133 return Ok(())
134 }
135 let mut store = self.inner.store.write();
136 let mut total_sub = 0;
137 for tx in txs {
138 total_sub += remove_size(&mut store, &tx);
139 }
140 self.inner.size_tracker.sub_size(total_sub);
141 self.inner.size_tracker.update_len(store.len());
142 Ok(())
143 }
144
145 fn cleanup(&self) -> BlobStoreCleanupStat {
146 BlobStoreCleanupStat::default()
147 }
148
149 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
151 Ok(self.inner.store.read().get(&tx).cloned())
152 }
153
154 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
155 Ok(self.inner.store.read().contains_key(&tx))
156 }
157
158 fn get_all(
159 &self,
160 txs: Vec<B256>,
161 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
162 let store = self.inner.store.read();
163 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
164 }
165
166 fn get_exact(
167 &self,
168 txs: Vec<B256>,
169 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
170 if txs.is_empty() {
171 return Ok(Vec::new());
172 }
173 let store = self.inner.store.read();
174 txs.into_iter()
175 .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
176 .collect()
177 }
178
179 fn get_by_versioned_hashes_v1(
180 &self,
181 versioned_hashes: &[B256],
182 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
183 let mut result = vec![None; versioned_hashes.len()];
184 for blob_sidecar in self.inner.store.read().values() {
185 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
186 for (hash_idx, match_result) in
187 blob_sidecar.match_versioned_hashes(versioned_hashes)
188 {
189 result[hash_idx] = Some(match_result);
190 }
191 }
192
193 if result.iter().all(|blob| blob.is_some()) {
195 break;
196 }
197 }
198 Ok(result)
199 }
200
201 fn get_by_versioned_hashes_v2(
202 &self,
203 versioned_hashes: &[B256],
204 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
205 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
206 if result.iter().all(|blob| blob.is_some()) {
207 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
208 } else {
209 Ok(None)
210 }
211 }
212
213 fn get_by_versioned_hashes_v3(
214 &self,
215 versioned_hashes: &[B256],
216 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
217 Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
218 }
219
220 fn get_by_versioned_hashes_v4(
221 &self,
222 versioned_hashes: &[B256],
223 indices_bitarray: B128,
224 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
225 self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
226 }
227
228 fn get_cells(
229 &self,
230 tx: B256,
231 indices_bitarray: B128,
232 ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
233 let Some(sidecar) = self.get(tx)? else {
234 return Ok(None);
235 };
236
237 let Some(sidecar) = sidecar.as_eip7594() else {
238 return Ok(None);
239 };
240
241 let versioned_hashes = sidecar.versioned_hashes().collect::<Vec<_>>();
242
243 let matches =
244 self.get_by_versioned_hashes_cells_eip7594(&versioned_hashes, indices_bitarray)?;
245
246 let mut cells = Vec::new();
247
248 for matched in matches {
249 let Some(matched) = matched else {
250 return Ok(None);
251 };
252
253 for cell in matched.blob_cells {
254 let Some(cell) = cell else {
255 return Ok(None);
256 };
257
258 cells.push(cell);
259 }
260 }
261
262 Ok(Some(cells))
263 }
264
265 fn data_size_hint(&self) -> Option<usize> {
266 Some(self.inner.size_tracker.data_size())
267 }
268
269 fn blobs_len(&self) -> usize {
270 self.inner.size_tracker.blobs_len()
271 }
272}
273
274#[inline]
276fn remove_size(store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
277 store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
278}
279
280#[inline]
284fn insert_size(
285 store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>,
286 tx: B256,
287 blob: BlobTransactionSidecarVariant,
288) -> usize {
289 let add = blob.size();
290 store.insert(tx, Arc::new(blob));
291 add
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use alloy_eips::{
298 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
299 eip7594::{
300 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
301 },
302 };
303
304 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
305 let blob = Blob::default();
306 let commitment = Bytes48::default();
307 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
308
309 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
310
311 let expected =
312 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
313 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
314
315 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
316 }
317
318 #[test]
319 fn mem_get_blobs_v3_returns_partial_results() {
320 let store = InMemoryBlobStore::default();
321
322 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
323 store.insert(B256::random(), sidecar).unwrap();
324
325 assert_ne!(versioned_hash, B256::ZERO);
326
327 let request = vec![versioned_hash, B256::ZERO];
328 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
329 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
330
331 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
332 assert_eq!(v3, vec![Some(expected), None]);
333 }
334
335 #[test]
336 fn mem_get_blobs_v4_returns_requested_cells() {
337 let store = InMemoryBlobStore::default();
338
339 let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
340 store.insert(B256::random(), sidecar).unwrap();
341
342 let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
343 let request = vec![versioned_hash, B256::ZERO];
344
345 let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
346 assert_eq!(v4.len(), request.len());
347 assert!(v4[1].is_none());
348
349 let cells_and_proofs = v4[0].as_ref().unwrap();
350 assert_eq!(cells_and_proofs.blob_cells.len(), 2);
351 assert_eq!(cells_and_proofs.proofs.len(), 2);
352 assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
353 assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
354 }
355}