reth_transaction_pool/blobstore/
mem.rs1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3 eip4844::{BlobAndProofV1, BlobAndProofV2},
4 eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::B256;
7use parking_lot::RwLock;
8use std::{collections::HashMap, sync::Arc};
9
10#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13 inner: Arc<InMemoryBlobStoreInner>,
14}
15
16impl InMemoryBlobStore {
17 fn get_by_versioned_hashes_eip7594(
23 &self,
24 versioned_hashes: &[B256],
25 ) -> Vec<Option<BlobAndProofV2>> {
26 let mut result = vec![None; versioned_hashes.len()];
27 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
28 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
29 for (hash_idx, match_result) in
30 blob_sidecar.match_versioned_hashes(versioned_hashes)
31 {
32 result[hash_idx] = Some(match_result);
33 }
34 }
35
36 if result.iter().all(|blob| blob.is_some()) {
38 break;
39 }
40 }
41 result
42 }
43}
44
45#[derive(Debug, Default)]
46struct InMemoryBlobStoreInner {
47 store: RwLock<HashMap<B256, Arc<BlobTransactionSidecarVariant>>>,
49 size_tracker: BlobStoreSize,
50}
51
52impl PartialEq for InMemoryBlobStoreInner {
53 fn eq(&self, other: &Self) -> bool {
54 self.store.read().eq(&other.store.read())
55 }
56}
57
58impl BlobStore for InMemoryBlobStore {
59 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
60 let mut store = self.inner.store.write();
61 self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
62 self.inner.size_tracker.update_len(store.len());
63 Ok(())
64 }
65
66 fn insert_all(
67 &self,
68 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
69 ) -> Result<(), BlobStoreError> {
70 if txs.is_empty() {
71 return Ok(())
72 }
73 let mut store = self.inner.store.write();
74 let mut total_add = 0;
75 for (tx, data) in txs {
76 let add = insert_size(&mut store, tx, data);
77 total_add += add;
78 }
79 self.inner.size_tracker.add_size(total_add);
80 self.inner.size_tracker.update_len(store.len());
81 Ok(())
82 }
83
84 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
85 let mut store = self.inner.store.write();
86 let sub = remove_size(&mut store, &tx);
87 self.inner.size_tracker.sub_size(sub);
88 self.inner.size_tracker.update_len(store.len());
89 Ok(())
90 }
91
92 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
93 if txs.is_empty() {
94 return Ok(())
95 }
96 let mut store = self.inner.store.write();
97 let mut total_sub = 0;
98 for tx in txs {
99 total_sub += remove_size(&mut store, &tx);
100 }
101 self.inner.size_tracker.sub_size(total_sub);
102 self.inner.size_tracker.update_len(store.len());
103 Ok(())
104 }
105
106 fn cleanup(&self) -> BlobStoreCleanupStat {
107 BlobStoreCleanupStat::default()
108 }
109
110 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
112 Ok(self.inner.store.read().get(&tx).cloned())
113 }
114
115 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
116 Ok(self.inner.store.read().contains_key(&tx))
117 }
118
119 fn get_all(
120 &self,
121 txs: Vec<B256>,
122 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
123 let store = self.inner.store.read();
124 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
125 }
126
127 fn get_exact(
128 &self,
129 txs: Vec<B256>,
130 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
131 if txs.is_empty() {
132 return Ok(Vec::new());
133 }
134 let store = self.inner.store.read();
135 txs.into_iter()
136 .map(|tx| store.get(&tx).cloned().ok_or(BlobStoreError::MissingSidecar(tx)))
137 .collect()
138 }
139
140 fn get_by_versioned_hashes_v1(
141 &self,
142 versioned_hashes: &[B256],
143 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
144 let mut result = vec![None; versioned_hashes.len()];
145 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
146 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
147 for (hash_idx, match_result) in
148 blob_sidecar.match_versioned_hashes(versioned_hashes)
149 {
150 result[hash_idx] = Some(match_result);
151 }
152 }
153
154 if result.iter().all(|blob| blob.is_some()) {
156 break;
157 }
158 }
159 Ok(result)
160 }
161
162 fn get_by_versioned_hashes_v2(
163 &self,
164 versioned_hashes: &[B256],
165 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
166 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
167 if result.iter().all(|blob| blob.is_some()) {
168 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
169 } else {
170 Ok(None)
171 }
172 }
173
174 fn get_by_versioned_hashes_v3(
175 &self,
176 versioned_hashes: &[B256],
177 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
178 Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
179 }
180
181 fn data_size_hint(&self) -> Option<usize> {
182 Some(self.inner.size_tracker.data_size())
183 }
184
185 fn blobs_len(&self) -> usize {
186 self.inner.size_tracker.blobs_len()
187 }
188}
189
190#[inline]
192fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
193 store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
194}
195
196#[inline]
200fn insert_size(
201 store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>,
202 tx: B256,
203 blob: BlobTransactionSidecarVariant,
204) -> usize {
205 let add = blob.size();
206 store.insert(tx, Arc::new(blob));
207 add
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use alloy_eips::{
214 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
215 eip7594::{
216 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
217 },
218 };
219
220 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
221 let blob = Blob::default();
222 let commitment = Bytes48::default();
223 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
224
225 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
226
227 let expected =
228 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
229 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
230
231 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
232 }
233
234 #[test]
235 fn mem_get_blobs_v3_returns_partial_results() {
236 let store = InMemoryBlobStore::default();
237
238 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
239 store.insert(B256::random(), sidecar).unwrap();
240
241 assert_ne!(versioned_hash, B256::ZERO);
242
243 let request = vec![versioned_hash, B256::ZERO];
244 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
245 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
246
247 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
248 assert_eq!(v3, vec![Some(expected), None]);
249 }
250}