reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3    eip4844::{BlobAndProofV1, BlobAndProofV2},
4    eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::B256;
7use parking_lot::RwLock;
8use std::{collections::HashMap, sync::Arc};
9
10/// An in-memory blob store.
11#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13    inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17    /// Look up EIP-7594 blobs by their versioned hashes.
18    ///
19    /// This returns a result vector with the **same length and order** as the input
20    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
21    /// `None` if it is missing or an older sidecar version.
22    fn get_by_versioned_hashes_eip7594(
23        &self,
24        versioned_hashes: &[B256],
25    ) -> Vec<Option<BlobAndProofV2>> {
26        let mut result = vec![None; versioned_hashes.len()];
27        let mut missing_count = result.len();
28        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
29            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30                for (hash_idx, match_result) in
31                    blob_sidecar.match_versioned_hashes(versioned_hashes)
32                {
33                    result[hash_idx] = Some(match_result);
34                    missing_count -= 1;
35                }
36            }
37
38            // Return early if all blobs are found.
39            if missing_count == 0 {
40                // since versioned_hashes may have duplicates, we double check here
41                if result.iter().all(|blob| blob.is_some()) {
42                    break;
43                }
44            }
45        }
46        result
47    }
48}
49
50#[derive(Debug, Default)]
51struct InMemoryBlobStoreInner {
52    /// Storage for all blob data.
53    store: RwLock<HashMap<B256, Arc<BlobTransactionSidecarVariant>>>,
54    size_tracker: BlobStoreSize,
55}
56
57impl PartialEq for InMemoryBlobStoreInner {
58    fn eq(&self, other: &Self) -> bool {
59        self.store.read().eq(&*other.store.read())
60    }
61}
62
63impl BlobStore for InMemoryBlobStore {
64    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
65        let mut store = self.inner.store.write();
66        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
67        self.inner.size_tracker.update_len(store.len());
68        Ok(())
69    }
70
71    fn insert_all(
72        &self,
73        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
74    ) -> Result<(), BlobStoreError> {
75        if txs.is_empty() {
76            return Ok(())
77        }
78        let mut store = self.inner.store.write();
79        let mut total_add = 0;
80        for (tx, data) in txs {
81            let add = insert_size(&mut store, tx, data);
82            total_add += add;
83        }
84        self.inner.size_tracker.add_size(total_add);
85        self.inner.size_tracker.update_len(store.len());
86        Ok(())
87    }
88
89    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
90        let mut store = self.inner.store.write();
91        let sub = remove_size(&mut store, &tx);
92        self.inner.size_tracker.sub_size(sub);
93        self.inner.size_tracker.update_len(store.len());
94        Ok(())
95    }
96
97    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
98        if txs.is_empty() {
99            return Ok(())
100        }
101        let mut store = self.inner.store.write();
102        let mut total_sub = 0;
103        for tx in txs {
104            total_sub += remove_size(&mut store, &tx);
105        }
106        self.inner.size_tracker.sub_size(total_sub);
107        self.inner.size_tracker.update_len(store.len());
108        Ok(())
109    }
110
111    fn cleanup(&self) -> BlobStoreCleanupStat {
112        BlobStoreCleanupStat::default()
113    }
114
115    // Retrieves the decoded blob data for the given transaction hash.
116    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
117        Ok(self.inner.store.read().get(&tx).cloned())
118    }
119
120    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
121        Ok(self.inner.store.read().contains_key(&tx))
122    }
123
124    fn get_all(
125        &self,
126        txs: Vec<B256>,
127    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
128        let store = self.inner.store.read();
129        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
130    }
131
132    fn get_exact(
133        &self,
134        txs: Vec<B256>,
135    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
136        if txs.is_empty() {
137            return Ok(Vec::new());
138        }
139        let store = self.inner.store.read();
140        txs.into_iter()
141            .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
142            .collect()
143    }
144
145    fn get_by_versioned_hashes_v1(
146        &self,
147        versioned_hashes: &[B256],
148    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
149        let mut result = vec![None; versioned_hashes.len()];
150        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
151            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
152                for (hash_idx, match_result) in
153                    blob_sidecar.match_versioned_hashes(versioned_hashes)
154                {
155                    result[hash_idx] = Some(match_result);
156                }
157            }
158
159            // Return early if all blobs are found.
160            if result.iter().all(|blob| blob.is_some()) {
161                break;
162            }
163        }
164        Ok(result)
165    }
166
167    fn get_by_versioned_hashes_v2(
168        &self,
169        versioned_hashes: &[B256],
170    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
171        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
172        if result.iter().all(|blob| blob.is_some()) {
173            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
174        } else {
175            Ok(None)
176        }
177    }
178
179    fn get_by_versioned_hashes_v3(
180        &self,
181        versioned_hashes: &[B256],
182    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
183        Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
184    }
185
186    fn data_size_hint(&self) -> Option<usize> {
187        Some(self.inner.size_tracker.data_size())
188    }
189
190    fn blobs_len(&self) -> usize {
191        self.inner.size_tracker.blobs_len()
192    }
193}
194
195/// Removes the given blob from the store and returns the size of the blob that was removed.
196#[inline]
197fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
198    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
199}
200
201/// Inserts the given blob into the store and returns the size of the blob that was added.
202///
203/// We don't need to handle the size updates for replacements because transactions are unique.
204#[inline]
205fn insert_size(
206    store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>,
207    tx: B256,
208    blob: BlobTransactionSidecarVariant,
209) -> usize {
210    let add = blob.size();
211    store.insert(tx, Arc::new(blob));
212    add
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use alloy_eips::{
219        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
220        eip7594::{
221            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
222        },
223    };
224
225    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
226        let blob = Blob::default();
227        let commitment = Bytes48::default();
228        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
229
230        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
231
232        let expected =
233            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
234        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
235
236        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
237    }
238
239    #[test]
240    fn mem_get_blobs_v3_returns_partial_results() {
241        let store = InMemoryBlobStore::default();
242
243        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
244        store.insert(B256::random(), sidecar).unwrap();
245
246        assert_ne!(versioned_hash, B256::ZERO);
247
248        let request = vec![versioned_hash, B256::ZERO];
249        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
250        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
251
252        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
253        assert_eq!(v3, vec![Some(expected), None]);
254    }
255}