1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2},
6 eip7594::BlobTransactionSidecarVariant,
7 eip7840::BlobParams,
8 merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24 BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33 inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37 pub fn open(
39 blob_dir: impl Into<PathBuf>,
40 opts: DiskFileBlobStoreConfig,
41 ) -> Result<Self, DiskFileBlobStoreError> {
42 let blob_dir = blob_dir.into();
43 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46 inner.delete_all()?;
48 inner.create_blob_dir()?;
49
50 Ok(Self { inner: Arc::new(inner) })
51 }
52
53 #[cfg(test)]
54 fn is_cached(&self, tx: &B256) -> bool {
55 self.inner.blob_cache.lock().get(tx).is_some()
56 }
57
58 #[cfg(test)]
59 fn clear_cache(&self) {
60 self.inner.blob_cache.lock().clear()
61 }
62
63 fn get_by_versioned_hashes_eip7594(
72 &self,
73 versioned_hashes: &[B256],
74 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75 let mut result = vec![None; versioned_hashes.len()];
78 let mut missing_count = result.len();
79 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82 for (hash_idx, match_result) in
83 blob_sidecar.match_versioned_hashes(versioned_hashes)
84 {
85 let slot = &mut result[hash_idx];
86 if slot.is_none() {
87 missing_count -= 1;
88 }
89 *slot = Some(match_result);
90 }
91 }
92
93 if missing_count == 0 {
95 if result.iter().all(|blob| blob.is_some()) {
97 return Ok(result);
98 }
99 }
100 }
101
102 let mut missing_tx_hashes = Vec::new();
104
105 {
106 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107 for (idx, _) in
108 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109 {
110 let versioned_hash = versioned_hashes[idx];
112 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113 missing_tx_hashes.push(tx_hash);
114 }
115 }
116 }
117
118 if !missing_tx_hashes.is_empty() {
120 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121 for (_, blob_sidecar) in blobs_from_disk {
122 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123 for (hash_idx, match_result) in
124 blob_sidecar.match_versioned_hashes(versioned_hashes)
125 {
126 if result[hash_idx].is_none() {
127 result[hash_idx] = Some(match_result);
128 }
129 }
130 }
131 }
132 }
133
134 Ok(result)
135 }
136}
137
138impl BlobStore for DiskFileBlobStore {
139 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
140 self.inner.insert_one(tx, data)
141 }
142
143 fn insert_all(
144 &self,
145 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
146 ) -> Result<(), BlobStoreError> {
147 if txs.is_empty() {
148 return Ok(())
149 }
150 self.inner.insert_many(txs)
151 }
152
153 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
154 if self.inner.contains(tx)? {
155 self.inner.txs_to_delete.write().insert(tx);
156 }
157 Ok(())
158 }
159
160 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
161 if txs.is_empty() {
162 return Ok(())
163 }
164 let txs = self.inner.retain_existing(txs)?;
165 self.inner.txs_to_delete.write().extend(txs);
166 Ok(())
167 }
168
169 fn cleanup(&self) -> BlobStoreCleanupStat {
170 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
171 let mut stat = BlobStoreCleanupStat::default();
172 let mut subsize = 0;
173 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
174 for tx in txs_to_delete {
175 let path = self.inner.blob_disk_file(tx);
176 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
177 match fs::remove_file(&path) {
178 Ok(_) => {
179 stat.delete_succeed += 1;
180 subsize += filesize;
181 }
182 Err(e) => {
183 stat.delete_failed += 1;
184 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
185 debug!(target:"txpool::blob", %err);
186 }
187 };
188 }
189 self.inner.size_tracker.sub_size(subsize as usize);
190 self.inner.size_tracker.sub_len(stat.delete_succeed);
191 stat
192 }
193
194 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
195 self.inner.get_one(tx)
196 }
197
198 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
199 self.inner.contains(tx)
200 }
201
202 fn get_all(
203 &self,
204 txs: Vec<B256>,
205 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
206 if txs.is_empty() {
207 return Ok(Vec::new())
208 }
209 self.inner.get_all(txs)
210 }
211
212 fn get_exact(
213 &self,
214 txs: Vec<B256>,
215 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
216 if txs.is_empty() {
217 return Ok(Vec::new())
218 }
219 self.inner.get_exact(txs)
220 }
221
222 fn get_by_versioned_hashes_v1(
223 &self,
224 versioned_hashes: &[B256],
225 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
226 let mut result = vec![None; versioned_hashes.len()];
228
229 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
231 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
232 for (hash_idx, match_result) in
233 blob_sidecar.match_versioned_hashes(versioned_hashes)
234 {
235 result[hash_idx] = Some(match_result);
236 }
237 }
238
239 if result.iter().all(|blob| blob.is_some()) {
241 return Ok(result);
242 }
243 }
244
245 let mut missing_tx_hashes = Vec::new();
248
249 {
250 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
251 for (idx, _) in
252 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
253 {
254 let versioned_hash = versioned_hashes[idx];
256 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
257 missing_tx_hashes.push(tx_hash);
258 }
259 }
260 }
261
262 if !missing_tx_hashes.is_empty() {
264 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
265 for (_, blob_sidecar) in blobs_from_disk {
266 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
267 for (hash_idx, match_result) in
268 blob_sidecar.match_versioned_hashes(versioned_hashes)
269 {
270 if result[hash_idx].is_none() {
271 result[hash_idx] = Some(match_result);
272 }
273 }
274 }
275 }
276 }
277
278 Ok(result)
279 }
280
281 fn get_by_versioned_hashes_v2(
282 &self,
283 versioned_hashes: &[B256],
284 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
285 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
286
287 if result.iter().all(|blob| blob.is_some()) {
289 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
290 } else {
291 Ok(None)
292 }
293 }
294
295 fn get_by_versioned_hashes_v3(
296 &self,
297 versioned_hashes: &[B256],
298 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
299 self.get_by_versioned_hashes_eip7594(versioned_hashes)
300 }
301
302 fn data_size_hint(&self) -> Option<usize> {
303 Some(self.inner.size_tracker.data_size())
304 }
305
306 fn blobs_len(&self) -> usize {
307 self.inner.size_tracker.blobs_len()
308 }
309}
310
311struct DiskFileBlobStoreInner {
312 blob_dir: PathBuf,
313 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
314 size_tracker: BlobStoreSize,
315 file_lock: RwLock<()>,
316 txs_to_delete: RwLock<B256Set>,
317 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
322}
323
324impl DiskFileBlobStoreInner {
325 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
327 Self {
328 blob_dir,
329 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
330 size_tracker: Default::default(),
331 file_lock: Default::default(),
332 txs_to_delete: Default::default(),
333 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
334 VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
335 ))),
336 }
337 }
338
339 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
341 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
342 fs::create_dir_all(&self.blob_dir)
343 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
344 }
345
346 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
348 match fs::remove_dir_all(&self.blob_dir) {
349 Ok(_) => {
350 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
351 }
352 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
353 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
354 }
355 Ok(())
356 }
357
358 fn insert_one(
360 &self,
361 tx: B256,
362 data: BlobTransactionSidecarVariant,
363 ) -> Result<(), BlobStoreError> {
364 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
365 data.rlp_encode_fields(&mut buf);
366
367 {
368 let mut map = self.versioned_hashes_to_txhash.lock();
370 data.versioned_hashes().for_each(|hash| {
371 map.insert(hash, tx);
372 });
373 }
374
375 self.blob_cache.lock().insert(tx, Arc::new(data));
376
377 let size = self.write_one_encoded(tx, &buf)?;
378
379 self.size_tracker.add_size(size);
380 self.size_tracker.inc_len(1);
381 Ok(())
382 }
383
384 fn insert_many(
386 &self,
387 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
388 ) -> Result<(), BlobStoreError> {
389 let raw = txs
390 .iter()
391 .map(|(tx, data)| {
392 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
393 data.rlp_encode_fields(&mut buf);
394 (self.blob_disk_file(*tx), buf)
395 })
396 .collect::<Vec<_>>();
397
398 {
399 let mut map = self.versioned_hashes_to_txhash.lock();
401 for (tx, data) in &txs {
402 data.versioned_hashes().for_each(|hash| {
403 map.insert(hash, *tx);
404 });
405 }
406 }
407
408 {
409 let mut cache = self.blob_cache.lock();
411 for (tx, data) in txs {
412 cache.insert(tx, Arc::new(data));
413 }
414 }
415
416 let mut add = 0;
417 let mut num = 0;
418 {
419 let _lock = self.file_lock.write();
420 for (path, data) in raw {
421 if path.exists() {
422 debug!(target:"txpool::blob", ?path, "Blob already exists");
423 } else if let Err(err) = fs::write(&path, &data) {
424 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
425 } else {
426 add += data.len();
427 num += 1;
428 }
429 }
430 }
431 self.size_tracker.add_size(add);
432 self.size_tracker.inc_len(num);
433
434 Ok(())
435 }
436
437 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
439 if self.blob_cache.lock().get(&tx).is_some() {
440 return Ok(true)
441 }
442 Ok(self.blob_disk_file(tx).is_file())
444 }
445
446 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
448 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
449 let mut cache = self.blob_cache.lock();
450 txs.into_iter().partition(|tx| cache.get(tx).is_some())
451 };
452
453 let mut existing = in_cache;
454 for tx in not_in_cache {
455 if self.blob_disk_file(tx).is_file() {
456 existing.push(tx);
457 }
458 }
459
460 Ok(existing)
461 }
462
463 fn get_one(
465 &self,
466 tx: B256,
467 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
468 if let Some(blob) = self.blob_cache.lock().get(&tx) {
469 return Ok(Some(blob.clone()))
470 }
471
472 if let Some(blob) = self.read_one(tx)? {
473 let blob_arc = Arc::new(blob);
474 self.blob_cache.lock().insert(tx, blob_arc.clone());
475 return Ok(Some(blob_arc))
476 }
477
478 Ok(None)
479 }
480
481 #[inline]
483 fn blob_disk_file(&self, tx: B256) -> PathBuf {
484 self.blob_dir.join(format!("{tx:x}"))
485 }
486
487 #[inline]
489 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
490 let path = self.blob_disk_file(tx);
491 let data = {
492 let _lock = self.file_lock.read();
493 match fs::read(&path) {
494 Ok(data) => data,
495 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
496 Err(e) => {
497 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
498 tx, path, e,
499 ))))
500 }
501 }
502 };
503 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
504 .map(Some)
505 .map_err(BlobStoreError::DecodeError)
506 }
507
508 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
512 self.read_many_raw(txs)
513 .into_iter()
514 .filter_map(|(tx, data)| {
515 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
516 .map(|sidecar| (tx, sidecar))
517 .ok()
518 })
519 .collect()
520 }
521
522 #[inline]
526 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
527 let mut res = Vec::with_capacity(txs.len());
528 let _lock = self.file_lock.read();
529 for tx in txs {
530 let path = self.blob_disk_file(tx);
531 match fs::read(&path) {
532 Ok(data) => {
533 res.push((tx, data));
534 }
535 Err(err) => {
536 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
537 }
538 };
539 }
540 res
541 }
542
543 #[inline]
545 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
546 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
547 let mut add = 0;
548 let path = self.blob_disk_file(tx);
549 {
550 let _lock = self.file_lock.write();
551 if !path.exists() {
552 fs::write(&path, data)
553 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
554 add = data.len();
555 }
556 }
557 Ok(add)
558 }
559
560 #[inline]
565 fn get_all(
566 &self,
567 txs: Vec<B256>,
568 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
569 let mut res = Vec::with_capacity(txs.len());
570 let mut cache_miss = Vec::new();
571 {
572 let mut cache = self.blob_cache.lock();
573 for tx in txs {
574 if let Some(blob) = cache.get(&tx) {
575 res.push((tx, blob.clone()));
576 } else {
577 cache_miss.push(tx)
578 }
579 }
580 }
581 if cache_miss.is_empty() {
582 return Ok(res)
583 }
584 let from_disk = self.read_many_decoded(cache_miss);
585 if from_disk.is_empty() {
586 return Ok(res)
587 }
588 let from_disk = from_disk
589 .into_iter()
590 .map(|(tx, data)| {
591 let data = Arc::new(data);
592 res.push((tx, data.clone()));
593 (tx, data)
594 })
595 .collect::<Vec<_>>();
596
597 let mut cache = self.blob_cache.lock();
598 for (tx, data) in from_disk {
599 cache.insert(tx, data);
600 }
601
602 Ok(res)
603 }
604
605 #[inline]
609 fn get_exact(
610 &self,
611 txs: Vec<B256>,
612 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
613 txs.into_iter()
614 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
615 .collect()
616 }
617}
618
619impl fmt::Debug for DiskFileBlobStoreInner {
620 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621 f.debug_struct("DiskFileBlobStoreInner")
622 .field("blob_dir", &self.blob_dir)
623 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
624 .field("txs_to_delete", &self.txs_to_delete.try_read())
625 .finish()
626 }
627}
628
629#[derive(Debug, thiserror::Error)]
631pub enum DiskFileBlobStoreError {
632 #[error("failed to open blobstore at {0}: {1}")]
634 Open(PathBuf, io::Error),
636 #[error("[{0}] failed to read blob file at {1}: {2}")]
638 ReadFile(TxHash, PathBuf, io::Error),
640 #[error("[{0}] failed to write blob file at {1}: {2}")]
642 WriteFile(TxHash, PathBuf, io::Error),
644 #[error("[{0}] failed to delete blob file at {1}: {2}")]
646 DeleteFile(TxHash, PathBuf, io::Error),
648}
649
650impl From<DiskFileBlobStoreError> for BlobStoreError {
651 fn from(value: DiskFileBlobStoreError) -> Self {
652 Self::Other(Box::new(value))
653 }
654}
655
656#[derive(Debug, Clone)]
658pub struct DiskFileBlobStoreConfig {
659 pub max_cached_entries: u32,
661 pub open: OpenDiskFileBlobStore,
663}
664
665impl Default for DiskFileBlobStoreConfig {
666 fn default() -> Self {
667 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
668 }
669}
670
671impl DiskFileBlobStoreConfig {
672 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
674 self.max_cached_entries = max_cached_entries;
675 self
676 }
677}
678
679#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
681pub enum OpenDiskFileBlobStore {
682 #[default]
684 Clear,
685 ReIndex,
687}
688
689#[cfg(test)]
690mod tests {
691 use alloy_consensus::BlobTransactionSidecar;
692 use alloy_eips::{
693 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
694 eip7594::{
695 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
696 },
697 };
698
699 use super::*;
700 use std::sync::atomic::Ordering;
701
702 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
703 let dir = tempfile::tempdir().unwrap();
704 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
705 (store, dir)
706 }
707
708 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
709 let mut rng = rand::rng();
710 (0..num)
711 .map(|_| {
712 let tx = TxHash::random_with(&mut rng);
713 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
714 blobs: vec![],
715 commitments: vec![],
716 proofs: vec![],
717 });
718 (tx, blob)
719 })
720 .collect()
721 }
722
723 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
724 let blob = Blob::default();
725 let commitment = Bytes48::default();
726 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
727
728 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
729
730 let expected =
731 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
732 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
733
734 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
735 }
736
737 #[test]
738 fn disk_insert_all_get_all() {
739 let (store, _dir) = tmp_store();
740
741 let blobs = rng_blobs(10);
742 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
743 store.insert_all(blobs.clone()).unwrap();
744
745 for (tx, blob) in &blobs {
747 assert!(store.is_cached(tx));
748 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
749 assert_eq!(b, *blob);
750 }
751
752 let all = store.get_all(all_hashes.clone()).unwrap();
753 for (tx, blob) in all {
754 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
755 }
756
757 assert!(store.contains(all_hashes[0]).unwrap());
758 store.delete_all(all_hashes.clone()).unwrap();
759 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
760 store.clear_cache();
761 store.cleanup();
762
763 assert!(store.get(blobs[0].0).unwrap().is_none());
764
765 let all = store.get_all(all_hashes.clone()).unwrap();
766 assert!(all.is_empty());
767
768 assert!(!store.contains(all_hashes[0]).unwrap());
769 assert!(store.get_exact(all_hashes).is_err());
770
771 assert_eq!(store.data_size_hint(), Some(0));
772 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
773 }
774
775 #[test]
776 fn disk_insert_and_retrieve() {
777 let (store, _dir) = tmp_store();
778
779 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
780 store.insert(tx, blob.clone()).unwrap();
781
782 assert!(store.is_cached(&tx));
783 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
784 assert_eq!(retrieved_blob, blob);
785 }
786
787 #[test]
788 fn disk_delete_blob() {
789 let (store, _dir) = tmp_store();
790
791 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
792 store.insert(tx, blob).unwrap();
793 assert!(store.is_cached(&tx));
794
795 store.delete(tx).unwrap();
796 assert!(store.inner.txs_to_delete.read().contains(&tx));
797 store.cleanup();
798
799 let result = store.get(tx).unwrap();
800 assert_eq!(
801 result,
802 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
803 blobs: vec![],
804 commitments: vec![],
805 proofs: vec![]
806 })))
807 );
808 }
809
810 #[test]
811 fn disk_insert_all_and_delete_all() {
812 let (store, _dir) = tmp_store();
813
814 let blobs = rng_blobs(5);
815 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
816 store.insert_all(blobs.clone()).unwrap();
817
818 for (tx, _) in &blobs {
819 assert!(store.is_cached(tx));
820 }
821
822 store.delete_all(txs.clone()).unwrap();
823 store.cleanup();
824
825 for tx in txs {
826 let result = store.get(tx).unwrap();
827 assert_eq!(
828 result,
829 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
830 blobs: vec![],
831 commitments: vec![],
832 proofs: vec![]
833 })))
834 );
835 }
836 }
837
838 #[test]
839 fn disk_get_all_blobs() {
840 let (store, _dir) = tmp_store();
841
842 let blobs = rng_blobs(3);
843 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
844 store.insert_all(blobs.clone()).unwrap();
845
846 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
847 for (tx, blob) in retrieved_blobs {
848 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
849 }
850
851 store.delete_all(txs).unwrap();
852 store.cleanup();
853 }
854
855 #[test]
856 fn disk_get_exact_blobs_success() {
857 let (store, _dir) = tmp_store();
858
859 let blobs = rng_blobs(3);
860 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
861 store.insert_all(blobs.clone()).unwrap();
862
863 let retrieved_blobs = store.get_exact(txs).unwrap();
864 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
865 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
866 }
867 }
868
869 #[test]
870 fn disk_get_exact_blobs_failure() {
871 let (store, _dir) = tmp_store();
872
873 let blobs = rng_blobs(2);
874 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
875 store.insert_all(blobs).unwrap();
876
877 let missing_tx = TxHash::random();
879 let result = store.get_exact(vec![txs[0], missing_tx]);
880 assert!(result.is_err());
881 }
882
883 #[test]
884 fn disk_data_size_hint() {
885 let (store, _dir) = tmp_store();
886 assert_eq!(store.data_size_hint(), Some(0));
887
888 let blobs = rng_blobs(2);
889 store.insert_all(blobs).unwrap();
890 assert!(store.data_size_hint().unwrap() > 0);
891 }
892
893 #[test]
894 fn disk_cleanup_stat() {
895 let (store, _dir) = tmp_store();
896
897 let blobs = rng_blobs(3);
898 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
899 store.insert_all(blobs).unwrap();
900
901 store.delete_all(txs).unwrap();
902 let stat = store.cleanup();
903 assert_eq!(stat.delete_succeed, 3);
904 assert_eq!(stat.delete_failed, 0);
905 }
906
907 #[test]
908 fn disk_get_blobs_v3_returns_partial_results() {
909 let (store, _dir) = tmp_store();
910
911 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
912 store.insert(TxHash::random(), sidecar).unwrap();
913
914 assert_ne!(versioned_hash, B256::ZERO);
915
916 let request = vec![versioned_hash, B256::ZERO];
917 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
918 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
919
920 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
921 assert_eq!(v3, vec![Some(expected), None]);
922 }
923
924 #[test]
925 fn disk_get_blobs_v3_can_fallback_to_disk() {
926 let (store, _dir) = tmp_store();
927
928 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
929 store.insert(TxHash::random(), sidecar).unwrap();
930 store.clear_cache();
931
932 let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
933 assert_eq!(v3, vec![Some(expected)]);
934 }
935}