reth_transaction_pool/blobstore/
mem.rs1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3 eip4844::{BlobAndProofV1, BlobAndProofV2},
4 eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::B256;
7use parking_lot::RwLock;
8use std::{collections::HashMap, sync::Arc};
9
10#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13 inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17 fn get_by_versioned_hashes_eip7594(
23 &self,
24 versioned_hashes: &[B256],
25 ) -> Vec<Option<BlobAndProofV2>> {
26 let mut result = vec![None; versioned_hashes.len()];
27 let mut missing_count = result.len();
28 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
29 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
30 for (hash_idx, match_result) in
31 blob_sidecar.match_versioned_hashes(versioned_hashes)
32 {
33 result[hash_idx] = Some(match_result);
34 missing_count -= 1;
35 }
36 }
37
38 if missing_count == 0 {
40 if result.iter().all(|blob| blob.is_some()) {
42 break;
43 }
44 }
45 }
46 result
47 }
48}
49
50#[derive(Debug, Default)]
51struct InMemoryBlobStoreInner {
52 store: RwLock<HashMap<B256, Arc<BlobTransactionSidecarVariant>>>,
54 size_tracker: BlobStoreSize,
55}
56
57impl PartialEq for InMemoryBlobStoreInner {
58 fn eq(&self, other: &Self) -> bool {
59 self.store.read().eq(&*other.store.read())
60 }
61}
62
63impl BlobStore for InMemoryBlobStore {
64 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
65 let mut store = self.inner.store.write();
66 self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
67 self.inner.size_tracker.update_len(store.len());
68 Ok(())
69 }
70
71 fn insert_all(
72 &self,
73 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
74 ) -> Result<(), BlobStoreError> {
75 if txs.is_empty() {
76 return Ok(())
77 }
78 let mut store = self.inner.store.write();
79 let mut total_add = 0;
80 for (tx, data) in txs {
81 let add = insert_size(&mut store, tx, data);
82 total_add += add;
83 }
84 self.inner.size_tracker.add_size(total_add);
85 self.inner.size_tracker.update_len(store.len());
86 Ok(())
87 }
88
89 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
90 let mut store = self.inner.store.write();
91 let sub = remove_size(&mut store, &tx);
92 self.inner.size_tracker.sub_size(sub);
93 self.inner.size_tracker.update_len(store.len());
94 Ok(())
95 }
96
97 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
98 if txs.is_empty() {
99 return Ok(())
100 }
101 let mut store = self.inner.store.write();
102 let mut total_sub = 0;
103 for tx in txs {
104 total_sub += remove_size(&mut store, &tx);
105 }
106 self.inner.size_tracker.sub_size(total_sub);
107 self.inner.size_tracker.update_len(store.len());
108 Ok(())
109 }
110
111 fn cleanup(&self) -> BlobStoreCleanupStat {
112 BlobStoreCleanupStat::default()
113 }
114
115 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
117 Ok(self.inner.store.read().get(&tx).cloned())
118 }
119
120 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
121 Ok(self.inner.store.read().contains_key(&tx))
122 }
123
124 fn get_all(
125 &self,
126 txs: Vec<B256>,
127 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
128 let store = self.inner.store.read();
129 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
130 }
131
132 fn get_exact(
133 &self,
134 txs: Vec<B256>,
135 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
136 if txs.is_empty() {
137 return Ok(Vec::new());
138 }
139 let store = self.inner.store.read();
140 txs.into_iter()
141 .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
142 .collect()
143 }
144
145 fn get_by_versioned_hashes_v1(
146 &self,
147 versioned_hashes: &[B256],
148 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
149 let mut result = vec![None; versioned_hashes.len()];
150 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
151 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
152 for (hash_idx, match_result) in
153 blob_sidecar.match_versioned_hashes(versioned_hashes)
154 {
155 result[hash_idx] = Some(match_result);
156 }
157 }
158
159 if result.iter().all(|blob| blob.is_some()) {
161 break;
162 }
163 }
164 Ok(result)
165 }
166
167 fn get_by_versioned_hashes_v2(
168 &self,
169 versioned_hashes: &[B256],
170 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
171 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
172 if result.iter().all(|blob| blob.is_some()) {
173 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
174 } else {
175 Ok(None)
176 }
177 }
178
179 fn get_by_versioned_hashes_v3(
180 &self,
181 versioned_hashes: &[B256],
182 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
183 Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
184 }
185
186 fn data_size_hint(&self) -> Option<usize> {
187 Some(self.inner.size_tracker.data_size())
188 }
189
190 fn blobs_len(&self) -> usize {
191 self.inner.size_tracker.blobs_len()
192 }
193}
194
195#[inline]
197fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
198 store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
199}
200
201#[inline]
205fn insert_size(
206 store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>,
207 tx: B256,
208 blob: BlobTransactionSidecarVariant,
209) -> usize {
210 let add = blob.size();
211 store.insert(tx, Arc::new(blob));
212 add
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use alloy_eips::{
219 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
220 eip7594::{
221 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
222 },
223 };
224
225 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
226 let blob = Blob::default();
227 let commitment = Bytes48::default();
228 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
229
230 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
231
232 let expected =
233 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
234 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
235
236 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
237 }
238
239 #[test]
240 fn mem_get_blobs_v3_returns_partial_results() {
241 let store = InMemoryBlobStore::default();
242
243 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
244 store.insert(B256::random(), sidecar).unwrap();
245
246 assert_ne!(versioned_hash, B256::ZERO);
247
248 let request = vec![versioned_hash, B256::ZERO];
249 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
250 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
251
252 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
253 assert_eq!(v3, vec![Some(expected), None]);
254 }
255}