Skip to main content

reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3    eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
4    eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
5};
6use alloy_primitives::{map::B256Map, B128, B256};
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10/// An in-memory blob store.
11#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13    inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17    /// Look up EIP-7594 blobs by their versioned hashes.
18    ///
19    /// This returns a result vector with the **same length and order** as the input
20    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
21    /// `None` if it is missing or an older sidecar version.
22    fn get_by_versioned_hashes_eip7594(
23        &self,
24        versioned_hashes: &[B256],
25    ) -> Vec<Option<BlobAndProofV2>> {
26        let mut result = vec![None; versioned_hashes.len()];
27        let mut missing_count = result.len();
28        for blob_sidecar in self.inner.store.read().values() {
29            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30                for (hash_idx, match_result) in
31                    blob_sidecar.match_versioned_hashes(versioned_hashes)
32                {
33                    let slot = &mut result[hash_idx];
34                    if slot.is_none() {
35                        missing_count -= 1;
36                    }
37                    *slot = Some(match_result);
38                }
39            }
40
41            // Return early if all blobs are found.
42            if missing_count == 0 {
43                // since versioned_hashes may have duplicates, we double check here
44                if result.iter().all(|blob| blob.is_some()) {
45                    break;
46                }
47            }
48        }
49        result
50    }
51
52    /// Look up EIP-7594 blob cells by their versioned hashes.
53    fn get_by_versioned_hashes_cells_eip7594(
54        &self,
55        versioned_hashes: &[B256],
56        indices_bitarray: B128,
57    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
58        let cell_mask = BlobCellMask::new(indices_bitarray);
59        let mut result = vec![None; versioned_hashes.len()];
60        let mut missing_count = result.len();
61        let blob_sidecars = self.inner.store.read().values().cloned().collect::<Vec<_>>();
62        for blob_sidecar in blob_sidecars {
63            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
64                for (hash_idx, match_result) in blob_sidecar
65                    .match_versioned_hashes_cells(versioned_hashes, cell_mask)
66                    .map_err(|err| BlobStoreError::Other(Box::new(err)))?
67                {
68                    let slot = &mut result[hash_idx];
69                    if slot.is_none() {
70                        missing_count -= 1;
71                    }
72                    *slot = Some(match_result);
73                }
74            }
75
76            if missing_count == 0 && result.iter().all(Option::is_some) {
77                break;
78            }
79        }
80        Ok(result)
81    }
82}
83
84#[derive(Debug, Default)]
85struct InMemoryBlobStoreInner {
86    /// Storage for all blob data.
87    store: RwLock<B256Map<Arc<BlobTransactionSidecarVariant>>>,
88    size_tracker: BlobStoreSize,
89}
90
91impl PartialEq for InMemoryBlobStoreInner {
92    fn eq(&self, other: &Self) -> bool {
93        self.store.read().eq(&*other.store.read())
94    }
95}
96
97impl BlobStore for InMemoryBlobStore {
98    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
99        let mut store = self.inner.store.write();
100        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
101        self.inner.size_tracker.update_len(store.len());
102        Ok(())
103    }
104
105    fn insert_all(
106        &self,
107        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
108    ) -> Result<(), BlobStoreError> {
109        if txs.is_empty() {
110            return Ok(())
111        }
112        let mut store = self.inner.store.write();
113        let mut total_add = 0;
114        for (tx, data) in txs {
115            let add = insert_size(&mut store, tx, data);
116            total_add += add;
117        }
118        self.inner.size_tracker.add_size(total_add);
119        self.inner.size_tracker.update_len(store.len());
120        Ok(())
121    }
122
123    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
124        let mut store = self.inner.store.write();
125        let sub = remove_size(&mut store, &tx);
126        self.inner.size_tracker.sub_size(sub);
127        self.inner.size_tracker.update_len(store.len());
128        Ok(())
129    }
130
131    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
132        if txs.is_empty() {
133            return Ok(())
134        }
135        let mut store = self.inner.store.write();
136        let mut total_sub = 0;
137        for tx in txs {
138            total_sub += remove_size(&mut store, &tx);
139        }
140        self.inner.size_tracker.sub_size(total_sub);
141        self.inner.size_tracker.update_len(store.len());
142        Ok(())
143    }
144
145    fn cleanup(&self) -> BlobStoreCleanupStat {
146        BlobStoreCleanupStat::default()
147    }
148
149    // Retrieves the decoded blob data for the given transaction hash.
150    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
151        Ok(self.inner.store.read().get(&tx).cloned())
152    }
153
154    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
155        Ok(self.inner.store.read().contains_key(&tx))
156    }
157
158    fn get_all(
159        &self,
160        txs: Vec<B256>,
161    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
162        let store = self.inner.store.read();
163        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
164    }
165
166    fn get_exact(
167        &self,
168        txs: Vec<B256>,
169    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
170        if txs.is_empty() {
171            return Ok(Vec::new());
172        }
173        let store = self.inner.store.read();
174        txs.into_iter()
175            .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
176            .collect()
177    }
178
179    fn get_by_versioned_hashes_v1(
180        &self,
181        versioned_hashes: &[B256],
182    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
183        let mut result = vec![None; versioned_hashes.len()];
184        for blob_sidecar in self.inner.store.read().values() {
185            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
186                for (hash_idx, match_result) in
187                    blob_sidecar.match_versioned_hashes(versioned_hashes)
188                {
189                    result[hash_idx] = Some(match_result);
190                }
191            }
192
193            // Return early if all blobs are found.
194            if result.iter().all(|blob| blob.is_some()) {
195                break;
196            }
197        }
198        Ok(result)
199    }
200
201    fn get_by_versioned_hashes_v2(
202        &self,
203        versioned_hashes: &[B256],
204    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
205        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
206        if result.iter().all(|blob| blob.is_some()) {
207            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
208        } else {
209            Ok(None)
210        }
211    }
212
213    fn get_by_versioned_hashes_v3(
214        &self,
215        versioned_hashes: &[B256],
216    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
217        Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
218    }
219
220    fn get_by_versioned_hashes_v4(
221        &self,
222        versioned_hashes: &[B256],
223        indices_bitarray: B128,
224    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
225        self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
226    }
227
228    fn get_cells(
229        &self,
230        tx: B256,
231        indices_bitarray: B128,
232    ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
233        let Some(sidecar) = self.get(tx)? else {
234            return Ok(None);
235        };
236
237        let Some(sidecar) = sidecar.as_eip7594() else {
238            return Ok(None);
239        };
240
241        let versioned_hashes = sidecar.versioned_hashes().collect::<Vec<_>>();
242
243        let matches =
244            self.get_by_versioned_hashes_cells_eip7594(&versioned_hashes, indices_bitarray)?;
245
246        let mut cells = Vec::new();
247
248        for matched in matches {
249            let Some(matched) = matched else {
250                return Ok(None);
251            };
252
253            for cell in matched.blob_cells {
254                let Some(cell) = cell else {
255                    return Ok(None);
256                };
257
258                cells.push(cell);
259            }
260        }
261
262        Ok(Some(cells))
263    }
264
265    fn data_size_hint(&self) -> Option<usize> {
266        Some(self.inner.size_tracker.data_size())
267    }
268
269    fn blobs_len(&self) -> usize {
270        self.inner.size_tracker.blobs_len()
271    }
272}
273
274/// Removes the given blob from the store and returns the size of the blob that was removed.
275#[inline]
276fn remove_size(store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
277    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
278}
279
280/// Inserts the given blob into the store and returns the size of the blob that was added.
281///
282/// We don't need to handle the size updates for replacements because transactions are unique.
283#[inline]
284fn insert_size(
285    store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>,
286    tx: B256,
287    blob: BlobTransactionSidecarVariant,
288) -> usize {
289    let add = blob.size();
290    store.insert(tx, Arc::new(blob));
291    add
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use alloy_eips::{
298        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
299        eip7594::{
300            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
301        },
302    };
303
304    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
305        let blob = Blob::default();
306        let commitment = Bytes48::default();
307        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
308
309        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
310
311        let expected =
312            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
313        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
314
315        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
316    }
317
318    #[test]
319    fn mem_get_blobs_v3_returns_partial_results() {
320        let store = InMemoryBlobStore::default();
321
322        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
323        store.insert(B256::random(), sidecar).unwrap();
324
325        assert_ne!(versioned_hash, B256::ZERO);
326
327        let request = vec![versioned_hash, B256::ZERO];
328        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
329        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
330
331        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
332        assert_eq!(v3, vec![Some(expected), None]);
333    }
334
335    #[test]
336    fn mem_get_blobs_v4_returns_requested_cells() {
337        let store = InMemoryBlobStore::default();
338
339        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
340        store.insert(B256::random(), sidecar).unwrap();
341
342        let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
343        let request = vec![versioned_hash, B256::ZERO];
344
345        let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
346        assert_eq!(v4.len(), request.len());
347        assert!(v4[1].is_none());
348
349        let cells_and_proofs = v4[0].as_ref().unwrap();
350        assert_eq!(cells_and_proofs.blob_cells.len(), 2);
351        assert_eq!(cells_and_proofs.proofs.len(), 2);
352        assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
353        assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
354    }
355}