Skip to main content

reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3    eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
4    eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
5};
6use alloy_primitives::{map::B256Map, B128, B256};
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10/// An in-memory blob store.
11#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13    inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17    /// Look up EIP-7594 blobs by their versioned hashes.
18    ///
19    /// This returns a result vector with the **same length and order** as the input
20    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
21    /// `None` if it is missing or an older sidecar version.
22    fn get_by_versioned_hashes_eip7594(
23        &self,
24        versioned_hashes: &[B256],
25    ) -> Vec<Option<BlobAndProofV2>> {
26        let mut result = vec![None; versioned_hashes.len()];
27        let mut missing_count = result.len();
28        for blob_sidecar in self.inner.store.read().values() {
29            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30                for (hash_idx, match_result) in
31                    blob_sidecar.match_versioned_hashes(versioned_hashes)
32                {
33                    let slot = &mut result[hash_idx];
34                    if slot.is_none() {
35                        missing_count -= 1;
36                    }
37                    *slot = Some(match_result);
38                }
39            }
40
41            // Return early if all blobs are found.
42            if missing_count == 0 {
43                // since versioned_hashes may have duplicates, we double check here
44                if result.iter().all(|blob| blob.is_some()) {
45                    break;
46                }
47            }
48        }
49        result
50    }
51
52    /// Look up EIP-7594 blob cells by their versioned hashes.
53    fn get_by_versioned_hashes_cells_eip7594(
54        &self,
55        versioned_hashes: &[B256],
56        indices_bitarray: B128,
57    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
58        let cell_mask = BlobCellMask::new(indices_bitarray);
59        let mut result = vec![None; versioned_hashes.len()];
60        let mut missing_count = result.len();
61        let blob_sidecars = self.inner.store.read().values().cloned().collect::<Vec<_>>();
62        for blob_sidecar in blob_sidecars {
63            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
64                for (hash_idx, match_result) in blob_sidecar
65                    .match_versioned_hashes_cells(versioned_hashes, cell_mask)
66                    .map_err(|err| BlobStoreError::Other(Box::new(err)))?
67                {
68                    let slot = &mut result[hash_idx];
69                    if slot.is_none() {
70                        missing_count -= 1;
71                    }
72                    *slot = Some(match_result);
73                }
74            }
75
76            if missing_count == 0 && result.iter().all(Option::is_some) {
77                break;
78            }
79        }
80        Ok(result)
81    }
82}
83
84#[derive(Debug, Default)]
85struct InMemoryBlobStoreInner {
86    /// Storage for all blob data.
87    store: RwLock<B256Map<Arc<BlobTransactionSidecarVariant>>>,
88    size_tracker: BlobStoreSize,
89}
90
91impl PartialEq for InMemoryBlobStoreInner {
92    fn eq(&self, other: &Self) -> bool {
93        self.store.read().eq(&*other.store.read())
94    }
95}
96
97impl BlobStore for InMemoryBlobStore {
98    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
99        let mut store = self.inner.store.write();
100        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
101        self.inner.size_tracker.update_len(store.len());
102        Ok(())
103    }
104
105    fn insert_all(
106        &self,
107        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
108    ) -> Result<(), BlobStoreError> {
109        if txs.is_empty() {
110            return Ok(())
111        }
112        let mut store = self.inner.store.write();
113        let mut total_add = 0;
114        for (tx, data) in txs {
115            let add = insert_size(&mut store, tx, data);
116            total_add += add;
117        }
118        self.inner.size_tracker.add_size(total_add);
119        self.inner.size_tracker.update_len(store.len());
120        Ok(())
121    }
122
123    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
124        let mut store = self.inner.store.write();
125        let sub = remove_size(&mut store, &tx);
126        self.inner.size_tracker.sub_size(sub);
127        self.inner.size_tracker.update_len(store.len());
128        Ok(())
129    }
130
131    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
132        if txs.is_empty() {
133            return Ok(())
134        }
135        let mut store = self.inner.store.write();
136        let mut total_sub = 0;
137        for tx in txs {
138            total_sub += remove_size(&mut store, &tx);
139        }
140        self.inner.size_tracker.sub_size(total_sub);
141        self.inner.size_tracker.update_len(store.len());
142        Ok(())
143    }
144
145    fn cleanup(&self) -> BlobStoreCleanupStat {
146        BlobStoreCleanupStat::default()
147    }
148
149    // Retrieves the decoded blob data for the given transaction hash.
150    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
151        Ok(self.inner.store.read().get(&tx).cloned())
152    }
153
154    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
155        Ok(self.inner.store.read().contains_key(&tx))
156    }
157
158    fn get_all(
159        &self,
160        txs: Vec<B256>,
161    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
162        let store = self.inner.store.read();
163        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
164    }
165
166    fn get_exact(
167        &self,
168        txs: Vec<B256>,
169    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
170        if txs.is_empty() {
171            return Ok(Vec::new());
172        }
173        let store = self.inner.store.read();
174        txs.into_iter()
175            .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
176            .collect()
177    }
178
179    fn get_by_versioned_hashes_v1(
180        &self,
181        versioned_hashes: &[B256],
182    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
183        let mut result = vec![None; versioned_hashes.len()];
184        for blob_sidecar in self.inner.store.read().values() {
185            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
186                for (hash_idx, match_result) in
187                    blob_sidecar.match_versioned_hashes(versioned_hashes)
188                {
189                    result[hash_idx] = Some(match_result);
190                }
191            }
192
193            // Return early if all blobs are found.
194            if result.iter().all(|blob| blob.is_some()) {
195                break;
196            }
197        }
198        Ok(result)
199    }
200
201    fn get_by_versioned_hashes_v2(
202        &self,
203        versioned_hashes: &[B256],
204    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
205        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
206        if result.iter().all(|blob| blob.is_some()) {
207            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
208        } else {
209            Ok(None)
210        }
211    }
212
213    fn get_by_versioned_hashes_v3(
214        &self,
215        versioned_hashes: &[B256],
216    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
217        Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
218    }
219
220    fn get_by_versioned_hashes_v4(
221        &self,
222        versioned_hashes: &[B256],
223        indices_bitarray: B128,
224    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
225        self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
226    }
227
228    fn get_cells(
229        &self,
230        tx: B256,
231        indices_bitarray: B128,
232    ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
233        let Some(sidecar) = self.get(tx)? else {
234            return Ok(None);
235        };
236
237        let Some(sidecar) = sidecar.as_eip7594() else {
238            return Ok(None);
239        };
240
241        sidecar
242            .compute_matching_cells(BlobCellMask::new(indices_bitarray))
243            .map(Some)
244            .map_err(|err| BlobStoreError::Other(Box::new(err)))
245    }
246
247    fn data_size_hint(&self) -> Option<usize> {
248        Some(self.inner.size_tracker.data_size())
249    }
250
251    fn blobs_len(&self) -> usize {
252        self.inner.size_tracker.blobs_len()
253    }
254}
255
256/// Removes the given blob from the store and returns the size of the blob that was removed.
257#[inline]
258fn remove_size(store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
259    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
260}
261
262/// Inserts the given blob into the store and returns the size of the blob that was added.
263///
264/// We don't need to handle the size updates for replacements because transactions are unique.
265#[inline]
266fn insert_size(
267    store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>,
268    tx: B256,
269    blob: BlobTransactionSidecarVariant,
270) -> usize {
271    let add = blob.size();
272    store.insert(tx, Arc::new(blob));
273    add
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use alloy_eips::{
280        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
281        eip7594::{
282            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
283        },
284    };
285
286    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
287        let blob = Blob::default();
288        let commitment = Bytes48::default();
289        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
290
291        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
292
293        let expected =
294            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
295        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
296
297        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
298    }
299
300    #[test]
301    fn mem_get_blobs_v3_returns_partial_results() {
302        let store = InMemoryBlobStore::default();
303
304        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
305        store.insert(B256::random(), sidecar).unwrap();
306
307        assert_ne!(versioned_hash, B256::ZERO);
308
309        let request = vec![versioned_hash, B256::ZERO];
310        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
311        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
312
313        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
314        assert_eq!(v3, vec![Some(expected), None]);
315    }
316
317    #[test]
318    fn mem_get_blobs_v4_returns_requested_cells() {
319        let store = InMemoryBlobStore::default();
320
321        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
322        store.insert(B256::random(), sidecar).unwrap();
323
324        let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
325        let request = vec![versioned_hash, B256::ZERO];
326
327        let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
328        assert_eq!(v4.len(), request.len());
329        assert!(v4[1].is_none());
330
331        let cells_and_proofs = v4[0].as_ref().unwrap();
332        assert_eq!(cells_and_proofs.blob_cells.len(), 2);
333        assert_eq!(cells_and_proofs.proofs.len(), 2);
334        assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
335        assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
336    }
337
338    #[test]
339    fn mem_get_cells_returns_requested_cells() {
340        let store = InMemoryBlobStore::default();
341
342        let tx_hash = B256::random();
343        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
344        store.insert(tx_hash, sidecar).unwrap();
345
346        let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
347        let expected = store
348            .get_by_versioned_hashes_v4(&[versioned_hash], indices_bitarray)
349            .unwrap()
350            .pop()
351            .unwrap()
352            .unwrap()
353            .blob_cells
354            .into_iter()
355            .collect::<Option<Vec<_>>>()
356            .unwrap();
357
358        assert_eq!(store.get_cells(tx_hash, indices_bitarray).unwrap(), Some(expected));
359    }
360}