reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3    eip4844::{BlobAndProofV1, BlobAndProofV2},
4    eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::B256;
7use parking_lot::RwLock;
8use std::{collections::HashMap, sync::Arc};
9
10/// An in-memory blob store.
11#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13    inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17    /// Look up EIP-7594 blobs by their versioned hashes.
18    ///
19    /// This returns a result vector with the **same length and order** as the input
20    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
21    /// `None` if it is missing or an older sidecar version.
22    fn get_by_versioned_hashes_eip7594(
23        &self,
24        versioned_hashes: &[B256],
25    ) -> Vec<Option<BlobAndProofV2>> {
26        let mut result = vec![None; versioned_hashes.len()];
27        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
28            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
29                for (hash_idx, match_result) in
30                    blob_sidecar.match_versioned_hashes(versioned_hashes)
31                {
32                    result[hash_idx] = Some(match_result);
33                }
34            }
35
36            // Return early if all blobs are found.
37            if result.iter().all(|blob| blob.is_some()) {
38                break;
39            }
40        }
41        result
42    }
43}
44
45#[derive(Debug, Default)]
46struct InMemoryBlobStoreInner {
47    /// Storage for all blob data.
48    store: RwLock<HashMap<B256, Arc<BlobTransactionSidecarVariant>>>,
49    size_tracker: BlobStoreSize,
50}
51
52impl PartialEq for InMemoryBlobStoreInner {
53    fn eq(&self, other: &Self) -> bool {
54        self.store.read().eq(&other.store.read())
55    }
56}
57
58impl BlobStore for InMemoryBlobStore {
59    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
60        let mut store = self.inner.store.write();
61        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
62        self.inner.size_tracker.update_len(store.len());
63        Ok(())
64    }
65
66    fn insert_all(
67        &self,
68        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
69    ) -> Result<(), BlobStoreError> {
70        if txs.is_empty() {
71            return Ok(())
72        }
73        let mut store = self.inner.store.write();
74        let mut total_add = 0;
75        for (tx, data) in txs {
76            let add = insert_size(&mut store, tx, data);
77            total_add += add;
78        }
79        self.inner.size_tracker.add_size(total_add);
80        self.inner.size_tracker.update_len(store.len());
81        Ok(())
82    }
83
84    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
85        let mut store = self.inner.store.write();
86        let sub = remove_size(&mut store, &tx);
87        self.inner.size_tracker.sub_size(sub);
88        self.inner.size_tracker.update_len(store.len());
89        Ok(())
90    }
91
92    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
93        if txs.is_empty() {
94            return Ok(())
95        }
96        let mut store = self.inner.store.write();
97        let mut total_sub = 0;
98        for tx in txs {
99            total_sub += remove_size(&mut store, &tx);
100        }
101        self.inner.size_tracker.sub_size(total_sub);
102        self.inner.size_tracker.update_len(store.len());
103        Ok(())
104    }
105
106    fn cleanup(&self) -> BlobStoreCleanupStat {
107        BlobStoreCleanupStat::default()
108    }
109
110    // Retrieves the decoded blob data for the given transaction hash.
111    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
112        Ok(self.inner.store.read().get(&tx).cloned())
113    }
114
115    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
116        Ok(self.inner.store.read().contains_key(&tx))
117    }
118
119    fn get_all(
120        &self,
121        txs: Vec<B256>,
122    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
123        let store = self.inner.store.read();
124        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
125    }
126
127    fn get_exact(
128        &self,
129        txs: Vec<B256>,
130    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
131        if txs.is_empty() {
132            return Ok(Vec::new());
133        }
134        let store = self.inner.store.read();
135        txs.into_iter()
136            .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
137            .collect()
138    }
139
140    fn get_by_versioned_hashes_v1(
141        &self,
142        versioned_hashes: &[B256],
143    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
144        let mut result = vec![None; versioned_hashes.len()];
145        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
146            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
147                for (hash_idx, match_result) in
148                    blob_sidecar.match_versioned_hashes(versioned_hashes)
149                {
150                    result[hash_idx] = Some(match_result);
151                }
152            }
153
154            // Return early if all blobs are found.
155            if result.iter().all(|blob| blob.is_some()) {
156                break;
157            }
158        }
159        Ok(result)
160    }
161
162    fn get_by_versioned_hashes_v2(
163        &self,
164        versioned_hashes: &[B256],
165    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
166        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
167        if result.iter().all(|blob| blob.is_some()) {
168            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
169        } else {
170            Ok(None)
171        }
172    }
173
174    fn get_by_versioned_hashes_v3(
175        &self,
176        versioned_hashes: &[B256],
177    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
178        Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
179    }
180
181    fn data_size_hint(&self) -> Option<usize> {
182        Some(self.inner.size_tracker.data_size())
183    }
184
185    fn blobs_len(&self) -> usize {
186        self.inner.size_tracker.blobs_len()
187    }
188}
189
190/// Removes the given blob from the store and returns the size of the blob that was removed.
191#[inline]
192fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
193    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
194}
195
196/// Inserts the given blob into the store and returns the size of the blob that was added.
197///
198/// We don't need to handle the size updates for replacements because transactions are unique.
199#[inline]
200fn insert_size(
201    store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>,
202    tx: B256,
203    blob: BlobTransactionSidecarVariant,
204) -> usize {
205    let add = blob.size();
206    store.insert(tx, Arc::new(blob));
207    add
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use alloy_eips::{
214        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
215        eip7594::{
216            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
217        },
218    };
219
220    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
221        let blob = Blob::default();
222        let commitment = Bytes48::default();
223        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
224
225        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
226
227        let expected =
228            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
229        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
230
231        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
232    }
233
234    #[test]
235    fn mem_get_blobs_v3_returns_partial_results() {
236        let store = InMemoryBlobStore::default();
237
238        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
239        store.insert(B256::random(), sidecar).unwrap();
240
241        assert_ne!(versioned_hash, B256::ZERO);
242
243        let request = vec![versioned_hash, B256::ZERO];
244        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
245        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
246
247        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
248        assert_eq!(v3, vec![Some(expected), None]);
249    }
250}