Skip to main content

reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3    eip4844::{BlobAndProofV1, BlobAndProofV2},
4    eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::{map::B256Map, B256};
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10/// An in-memory blob store.
11#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13    inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17    /// Look up EIP-7594 blobs by their versioned hashes.
18    ///
19    /// This returns a result vector with the **same length and order** as the input
20    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
21    /// `None` if it is missing or an older sidecar version.
22    fn get_by_versioned_hashes_eip7594(
23        &self,
24        versioned_hashes: &[B256],
25    ) -> Vec<Option<BlobAndProofV2>> {
26        let mut result = vec![None; versioned_hashes.len()];
27        let mut missing_count = result.len();
28        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
29            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30                for (hash_idx, match_result) in
31                    blob_sidecar.match_versioned_hashes(versioned_hashes)
32                {
33                    let slot = &mut result[hash_idx];
34                    if slot.is_none() {
35                        missing_count -= 1;
36                    }
37                    *slot = Some(match_result);
38                }
39            }
40
41            // Return early if all blobs are found.
42            if missing_count == 0 {
43                // since versioned_hashes may have duplicates, we double check here
44                if result.iter().all(|blob| blob.is_some()) {
45                    break;
46                }
47            }
48        }
49        result
50    }
51}
52
53#[derive(Debug, Default)]
54struct InMemoryBlobStoreInner {
55    /// Storage for all blob data.
56    store: RwLock<B256Map<Arc<BlobTransactionSidecarVariant>>>,
57    size_tracker: BlobStoreSize,
58}
59
60impl PartialEq for InMemoryBlobStoreInner {
61    fn eq(&self, other: &Self) -> bool {
62        self.store.read().eq(&*other.store.read())
63    }
64}
65
66impl BlobStore for InMemoryBlobStore {
67    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
68        let mut store = self.inner.store.write();
69        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
70        self.inner.size_tracker.update_len(store.len());
71        Ok(())
72    }
73
74    fn insert_all(
75        &self,
76        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
77    ) -> Result<(), BlobStoreError> {
78        if txs.is_empty() {
79            return Ok(())
80        }
81        let mut store = self.inner.store.write();
82        let mut total_add = 0;
83        for (tx, data) in txs {
84            let add = insert_size(&mut store, tx, data);
85            total_add += add;
86        }
87        self.inner.size_tracker.add_size(total_add);
88        self.inner.size_tracker.update_len(store.len());
89        Ok(())
90    }
91
92    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
93        let mut store = self.inner.store.write();
94        let sub = remove_size(&mut store, &tx);
95        self.inner.size_tracker.sub_size(sub);
96        self.inner.size_tracker.update_len(store.len());
97        Ok(())
98    }
99
100    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
101        if txs.is_empty() {
102            return Ok(())
103        }
104        let mut store = self.inner.store.write();
105        let mut total_sub = 0;
106        for tx in txs {
107            total_sub += remove_size(&mut store, &tx);
108        }
109        self.inner.size_tracker.sub_size(total_sub);
110        self.inner.size_tracker.update_len(store.len());
111        Ok(())
112    }
113
114    fn cleanup(&self) -> BlobStoreCleanupStat {
115        BlobStoreCleanupStat::default()
116    }
117
118    // Retrieves the decoded blob data for the given transaction hash.
119    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
120        Ok(self.inner.store.read().get(&tx).cloned())
121    }
122
123    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
124        Ok(self.inner.store.read().contains_key(&tx))
125    }
126
127    fn get_all(
128        &self,
129        txs: Vec<B256>,
130    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
131        let store = self.inner.store.read();
132        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
133    }
134
135    fn get_exact(
136        &self,
137        txs: Vec<B256>,
138    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
139        if txs.is_empty() {
140            return Ok(Vec::new());
141        }
142        let store = self.inner.store.read();
143        txs.into_iter()
144            .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
145            .collect()
146    }
147
148    fn get_by_versioned_hashes_v1(
149        &self,
150        versioned_hashes: &[B256],
151    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
152        let mut result = vec![None; versioned_hashes.len()];
153        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
154            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
155                for (hash_idx, match_result) in
156                    blob_sidecar.match_versioned_hashes(versioned_hashes)
157                {
158                    result[hash_idx] = Some(match_result);
159                }
160            }
161
162            // Return early if all blobs are found.
163            if result.iter().all(|blob| blob.is_some()) {
164                break;
165            }
166        }
167        Ok(result)
168    }
169
170    fn get_by_versioned_hashes_v2(
171        &self,
172        versioned_hashes: &[B256],
173    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
174        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
175        if result.iter().all(|blob| blob.is_some()) {
176            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
177        } else {
178            Ok(None)
179        }
180    }
181
182    fn get_by_versioned_hashes_v3(
183        &self,
184        versioned_hashes: &[B256],
185    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
186        Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
187    }
188
189    fn data_size_hint(&self) -> Option<usize> {
190        Some(self.inner.size_tracker.data_size())
191    }
192
193    fn blobs_len(&self) -> usize {
194        self.inner.size_tracker.blobs_len()
195    }
196}
197
198/// Removes the given blob from the store and returns the size of the blob that was removed.
199#[inline]
200fn remove_size(store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
201    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
202}
203
204/// Inserts the given blob into the store and returns the size of the blob that was added.
205///
206/// We don't need to handle the size updates for replacements because transactions are unique.
207#[inline]
208fn insert_size(
209    store: &mut B256Map<Arc<BlobTransactionSidecarVariant>>,
210    tx: B256,
211    blob: BlobTransactionSidecarVariant,
212) -> usize {
213    let add = blob.size();
214    store.insert(tx, Arc::new(blob));
215    add
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use alloy_eips::{
222        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
223        eip7594::{
224            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
225        },
226    };
227
228    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
229        let blob = Blob::default();
230        let commitment = Bytes48::default();
231        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
232
233        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
234
235        let expected =
236            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
237        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
238
239        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
240    }
241
242    #[test]
243    fn mem_get_blobs_v3_returns_partial_results() {
244        let store = InMemoryBlobStore::default();
245
246        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
247        store.insert(B256::random(), sidecar).unwrap();
248
249        assert_ne!(versioned_hash, B256::ZERO);
250
251        let request = vec![versioned_hash, B256::ZERO];
252        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
253        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
254
255        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
256        assert_eq!(v3, vec![Some(expected), None]);
257    }
258}