1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2},
6 eip7594::BlobTransactionSidecarVariant,
7 eip7840::BlobParams,
8 merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24 BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33 inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37 pub fn open(
39 blob_dir: impl Into<PathBuf>,
40 opts: DiskFileBlobStoreConfig,
41 ) -> Result<Self, DiskFileBlobStoreError> {
42 let blob_dir = blob_dir.into();
43 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46 inner.delete_all()?;
48 inner.create_blob_dir()?;
49
50 Ok(Self { inner: Arc::new(inner) })
51 }
52
53 #[cfg(test)]
54 fn is_cached(&self, tx: &B256) -> bool {
55 self.inner.blob_cache.lock().get(tx).is_some()
56 }
57
58 #[cfg(test)]
59 fn clear_cache(&self) {
60 self.inner.blob_cache.lock().clear()
61 }
62}
63
64impl BlobStore for DiskFileBlobStore {
65 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
66 self.inner.insert_one(tx, data)
67 }
68
69 fn insert_all(
70 &self,
71 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
72 ) -> Result<(), BlobStoreError> {
73 if txs.is_empty() {
74 return Ok(())
75 }
76 self.inner.insert_many(txs)
77 }
78
79 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
80 if self.inner.contains(tx)? {
81 self.inner.txs_to_delete.write().insert(tx);
82 }
83 Ok(())
84 }
85
86 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
87 let txs = self.inner.retain_existing(txs)?;
88 self.inner.txs_to_delete.write().extend(txs);
89 Ok(())
90 }
91
92 fn cleanup(&self) -> BlobStoreCleanupStat {
93 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
94 let mut stat = BlobStoreCleanupStat::default();
95 let mut subsize = 0;
96 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
97 for tx in txs_to_delete {
98 let path = self.inner.blob_disk_file(tx);
99 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
100 match fs::remove_file(&path) {
101 Ok(_) => {
102 stat.delete_succeed += 1;
103 subsize += filesize;
104 }
105 Err(e) => {
106 stat.delete_failed += 1;
107 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
108 debug!(target:"txpool::blob", %err);
109 }
110 };
111 }
112 self.inner.size_tracker.sub_size(subsize as usize);
113 self.inner.size_tracker.sub_len(stat.delete_succeed);
114 stat
115 }
116
117 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
118 self.inner.get_one(tx)
119 }
120
121 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
122 self.inner.contains(tx)
123 }
124
125 fn get_all(
126 &self,
127 txs: Vec<B256>,
128 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
129 if txs.is_empty() {
130 return Ok(Vec::new())
131 }
132 self.inner.get_all(txs)
133 }
134
135 fn get_exact(
136 &self,
137 txs: Vec<B256>,
138 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
139 if txs.is_empty() {
140 return Ok(Vec::new())
141 }
142 self.inner.get_exact(txs)
143 }
144
145 fn get_by_versioned_hashes_v1(
146 &self,
147 versioned_hashes: &[B256],
148 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
149 let mut result = vec![None; versioned_hashes.len()];
151
152 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
154 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
155 for (hash_idx, match_result) in
156 blob_sidecar.match_versioned_hashes(versioned_hashes)
157 {
158 result[hash_idx] = Some(match_result);
159 }
160 }
161
162 if result.iter().all(|blob| blob.is_some()) {
164 return Ok(result);
165 }
166 }
167
168 let mut missing_tx_hashes = Vec::new();
171
172 {
173 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
174 for (idx, _) in
175 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
176 {
177 let versioned_hash = versioned_hashes[idx];
179 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
180 missing_tx_hashes.push(tx_hash);
181 }
182 }
183 }
184
185 if !missing_tx_hashes.is_empty() {
187 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
188 for (_, blob_sidecar) in blobs_from_disk {
189 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
190 for (hash_idx, match_result) in
191 blob_sidecar.match_versioned_hashes(versioned_hashes)
192 {
193 if result[hash_idx].is_none() {
194 result[hash_idx] = Some(match_result);
195 }
196 }
197 }
198 }
199 }
200
201 Ok(result)
202 }
203
204 fn get_by_versioned_hashes_v2(
205 &self,
206 versioned_hashes: &[B256],
207 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
208 let mut result = vec![None; versioned_hashes.len()];
211
212 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
214 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
215 for (hash_idx, match_result) in
216 blob_sidecar.match_versioned_hashes(versioned_hashes)
217 {
218 result[hash_idx] = Some(match_result);
219 }
220 }
221
222 if result.iter().all(|blob| blob.is_some()) {
224 return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
226 }
227 }
228
229 let mut missing_tx_hashes = Vec::new();
231
232 {
233 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
234 for (idx, _) in
235 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
236 {
237 let versioned_hash = versioned_hashes[idx];
239 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
240 missing_tx_hashes.push(tx_hash);
241 }
242 }
243 }
244
245 if !missing_tx_hashes.is_empty() {
247 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
248 for (_, blob_sidecar) in blobs_from_disk {
249 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
250 for (hash_idx, match_result) in
251 blob_sidecar.match_versioned_hashes(versioned_hashes)
252 {
253 if result[hash_idx].is_none() {
254 result[hash_idx] = Some(match_result);
255 }
256 }
257 }
258 }
259 }
260
261 if result.iter().all(|blob| blob.is_some()) {
263 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
264 } else {
265 Ok(None)
266 }
267 }
268
269 fn data_size_hint(&self) -> Option<usize> {
270 Some(self.inner.size_tracker.data_size())
271 }
272
273 fn blobs_len(&self) -> usize {
274 self.inner.size_tracker.blobs_len()
275 }
276}
277
278struct DiskFileBlobStoreInner {
279 blob_dir: PathBuf,
280 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
281 size_tracker: BlobStoreSize,
282 file_lock: RwLock<()>,
283 txs_to_delete: RwLock<HashSet<B256>>,
284 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
289}
290
291impl DiskFileBlobStoreInner {
292 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
294 Self {
295 blob_dir,
296 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
297 size_tracker: Default::default(),
298 file_lock: Default::default(),
299 txs_to_delete: Default::default(),
300 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
301 VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
302 ))),
303 }
304 }
305
306 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
308 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
309 fs::create_dir_all(&self.blob_dir)
310 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
311 }
312
313 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
315 match fs::remove_dir_all(&self.blob_dir) {
316 Ok(_) => {
317 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
318 }
319 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
320 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
321 }
322 Ok(())
323 }
324
325 fn insert_one(
327 &self,
328 tx: B256,
329 data: BlobTransactionSidecarVariant,
330 ) -> Result<(), BlobStoreError> {
331 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
332 data.rlp_encode_fields(&mut buf);
333
334 {
335 let mut map = self.versioned_hashes_to_txhash.lock();
337 data.versioned_hashes().for_each(|hash| {
338 map.insert(hash, tx);
339 });
340 }
341
342 self.blob_cache.lock().insert(tx, Arc::new(data));
343
344 let size = self.write_one_encoded(tx, &buf)?;
345
346 self.size_tracker.add_size(size);
347 self.size_tracker.inc_len(1);
348 Ok(())
349 }
350
351 fn insert_many(
353 &self,
354 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
355 ) -> Result<(), BlobStoreError> {
356 let raw = txs
357 .iter()
358 .map(|(tx, data)| {
359 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
360 data.rlp_encode_fields(&mut buf);
361 (self.blob_disk_file(*tx), buf)
362 })
363 .collect::<Vec<_>>();
364
365 {
366 let mut map = self.versioned_hashes_to_txhash.lock();
368 for (tx, data) in &txs {
369 data.versioned_hashes().for_each(|hash| {
370 map.insert(hash, *tx);
371 });
372 }
373 }
374
375 {
376 let mut cache = self.blob_cache.lock();
378 for (tx, data) in txs {
379 cache.insert(tx, Arc::new(data));
380 }
381 }
382
383 let mut add = 0;
384 let mut num = 0;
385 {
386 let _lock = self.file_lock.write();
387 for (path, data) in raw {
388 if path.exists() {
389 debug!(target:"txpool::blob", ?path, "Blob already exists");
390 } else if let Err(err) = fs::write(&path, &data) {
391 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
392 } else {
393 add += data.len();
394 num += 1;
395 }
396 }
397 }
398 self.size_tracker.add_size(add);
399 self.size_tracker.inc_len(num);
400
401 Ok(())
402 }
403
404 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
406 if self.blob_cache.lock().get(&tx).is_some() {
407 return Ok(true)
408 }
409 Ok(self.blob_disk_file(tx).is_file())
411 }
412
413 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
415 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
416 let mut cache = self.blob_cache.lock();
417 txs.into_iter().partition(|tx| cache.get(tx).is_some())
418 };
419
420 let mut existing = in_cache;
421 for tx in not_in_cache {
422 if self.blob_disk_file(tx).is_file() {
423 existing.push(tx);
424 }
425 }
426
427 Ok(existing)
428 }
429
430 fn get_one(
432 &self,
433 tx: B256,
434 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
435 if let Some(blob) = self.blob_cache.lock().get(&tx) {
436 return Ok(Some(blob.clone()))
437 }
438
439 if let Some(blob) = self.read_one(tx)? {
440 let blob_arc = Arc::new(blob);
441 self.blob_cache.lock().insert(tx, blob_arc.clone());
442 return Ok(Some(blob_arc))
443 }
444
445 Ok(None)
446 }
447
448 #[inline]
450 fn blob_disk_file(&self, tx: B256) -> PathBuf {
451 self.blob_dir.join(format!("{tx:x}"))
452 }
453
454 #[inline]
456 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
457 let path = self.blob_disk_file(tx);
458 let data = {
459 let _lock = self.file_lock.read();
460 match fs::read(&path) {
461 Ok(data) => data,
462 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
463 Err(e) => {
464 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
465 tx, path, e,
466 ))))
467 }
468 }
469 };
470 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
471 .map(Some)
472 .map_err(BlobStoreError::DecodeError)
473 }
474
475 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
479 self.read_many_raw(txs)
480 .into_iter()
481 .filter_map(|(tx, data)| {
482 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
483 .map(|sidecar| (tx, sidecar))
484 .ok()
485 })
486 .collect()
487 }
488
489 #[inline]
493 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
494 let mut res = Vec::with_capacity(txs.len());
495 let _lock = self.file_lock.read();
496 for tx in txs {
497 let path = self.blob_disk_file(tx);
498 match fs::read(&path) {
499 Ok(data) => {
500 res.push((tx, data));
501 }
502 Err(err) => {
503 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
504 }
505 };
506 }
507 res
508 }
509
510 #[inline]
512 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
513 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
514 let mut add = 0;
515 let path = self.blob_disk_file(tx);
516 {
517 let _lock = self.file_lock.write();
518 if !path.exists() {
519 fs::write(&path, data)
520 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
521 add = data.len();
522 }
523 }
524 Ok(add)
525 }
526
527 #[inline]
532 fn get_all(
533 &self,
534 txs: Vec<B256>,
535 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
536 let mut res = Vec::with_capacity(txs.len());
537 let mut cache_miss = Vec::new();
538 {
539 let mut cache = self.blob_cache.lock();
540 for tx in txs {
541 if let Some(blob) = cache.get(&tx) {
542 res.push((tx, blob.clone()));
543 } else {
544 cache_miss.push(tx)
545 }
546 }
547 }
548 if cache_miss.is_empty() {
549 return Ok(res)
550 }
551 let from_disk = self.read_many_decoded(cache_miss);
552 if from_disk.is_empty() {
553 return Ok(res)
554 }
555 let from_disk = from_disk
556 .into_iter()
557 .map(|(tx, data)| {
558 let data = Arc::new(data);
559 res.push((tx, data.clone()));
560 (tx, data)
561 })
562 .collect::<Vec<_>>();
563
564 let mut cache = self.blob_cache.lock();
565 for (tx, data) in from_disk {
566 cache.insert(tx, data);
567 }
568
569 Ok(res)
570 }
571
572 #[inline]
576 fn get_exact(
577 &self,
578 txs: Vec<B256>,
579 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
580 txs.into_iter()
581 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
582 .collect()
583 }
584}
585
586impl fmt::Debug for DiskFileBlobStoreInner {
587 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588 f.debug_struct("DiskFileBlobStoreInner")
589 .field("blob_dir", &self.blob_dir)
590 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
591 .field("txs_to_delete", &self.txs_to_delete.try_read())
592 .finish()
593 }
594}
595
596#[derive(Debug, thiserror::Error)]
598pub enum DiskFileBlobStoreError {
599 #[error("failed to open blobstore at {0}: {1}")]
601 Open(PathBuf, io::Error),
603 #[error("[{0}] failed to read blob file at {1}: {2}")]
605 ReadFile(TxHash, PathBuf, io::Error),
607 #[error("[{0}] failed to write blob file at {1}: {2}")]
609 WriteFile(TxHash, PathBuf, io::Error),
611 #[error("[{0}] failed to delete blob file at {1}: {2}")]
613 DeleteFile(TxHash, PathBuf, io::Error),
615}
616
617impl From<DiskFileBlobStoreError> for BlobStoreError {
618 fn from(value: DiskFileBlobStoreError) -> Self {
619 Self::Other(Box::new(value))
620 }
621}
622
623#[derive(Debug, Clone)]
625pub struct DiskFileBlobStoreConfig {
626 pub max_cached_entries: u32,
628 pub open: OpenDiskFileBlobStore,
630}
631
632impl Default for DiskFileBlobStoreConfig {
633 fn default() -> Self {
634 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
635 }
636}
637
638impl DiskFileBlobStoreConfig {
639 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
641 self.max_cached_entries = max_cached_entries;
642 self
643 }
644}
645
646#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
648pub enum OpenDiskFileBlobStore {
649 #[default]
651 Clear,
652 ReIndex,
654}
655
656#[cfg(test)]
657mod tests {
658 use alloy_consensus::BlobTransactionSidecar;
659 use alloy_eips::eip7594::BlobTransactionSidecarVariant;
660
661 use super::*;
662 use std::sync::atomic::Ordering;
663
664 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
665 let dir = tempfile::tempdir().unwrap();
666 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
667 (store, dir)
668 }
669
670 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
671 let mut rng = rand::rng();
672 (0..num)
673 .map(|_| {
674 let tx = TxHash::random_with(&mut rng);
675 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
676 blobs: vec![],
677 commitments: vec![],
678 proofs: vec![],
679 });
680 (tx, blob)
681 })
682 .collect()
683 }
684
685 #[test]
686 fn disk_insert_all_get_all() {
687 let (store, _dir) = tmp_store();
688
689 let blobs = rng_blobs(10);
690 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
691 store.insert_all(blobs.clone()).unwrap();
692
693 for (tx, blob) in &blobs {
695 assert!(store.is_cached(tx));
696 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
697 assert_eq!(b, *blob);
698 }
699
700 let all = store.get_all(all_hashes.clone()).unwrap();
701 for (tx, blob) in all {
702 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
703 }
704
705 assert!(store.contains(all_hashes[0]).unwrap());
706 store.delete_all(all_hashes.clone()).unwrap();
707 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
708 store.clear_cache();
709 store.cleanup();
710
711 assert!(store.get(blobs[0].0).unwrap().is_none());
712
713 let all = store.get_all(all_hashes.clone()).unwrap();
714 assert!(all.is_empty());
715
716 assert!(!store.contains(all_hashes[0]).unwrap());
717 assert!(store.get_exact(all_hashes).is_err());
718
719 assert_eq!(store.data_size_hint(), Some(0));
720 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
721 }
722
723 #[test]
724 fn disk_insert_and_retrieve() {
725 let (store, _dir) = tmp_store();
726
727 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
728 store.insert(tx, blob.clone()).unwrap();
729
730 assert!(store.is_cached(&tx));
731 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
732 assert_eq!(retrieved_blob, blob);
733 }
734
735 #[test]
736 fn disk_delete_blob() {
737 let (store, _dir) = tmp_store();
738
739 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
740 store.insert(tx, blob).unwrap();
741 assert!(store.is_cached(&tx));
742
743 store.delete(tx).unwrap();
744 assert!(store.inner.txs_to_delete.read().contains(&tx));
745 store.cleanup();
746
747 let result = store.get(tx).unwrap();
748 assert_eq!(
749 result,
750 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
751 blobs: vec![],
752 commitments: vec![],
753 proofs: vec![]
754 })))
755 );
756 }
757
758 #[test]
759 fn disk_insert_all_and_delete_all() {
760 let (store, _dir) = tmp_store();
761
762 let blobs = rng_blobs(5);
763 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
764 store.insert_all(blobs.clone()).unwrap();
765
766 for (tx, _) in &blobs {
767 assert!(store.is_cached(tx));
768 }
769
770 store.delete_all(txs.clone()).unwrap();
771 store.cleanup();
772
773 for tx in txs {
774 let result = store.get(tx).unwrap();
775 assert_eq!(
776 result,
777 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
778 blobs: vec![],
779 commitments: vec![],
780 proofs: vec![]
781 })))
782 );
783 }
784 }
785
786 #[test]
787 fn disk_get_all_blobs() {
788 let (store, _dir) = tmp_store();
789
790 let blobs = rng_blobs(3);
791 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
792 store.insert_all(blobs.clone()).unwrap();
793
794 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
795 for (tx, blob) in retrieved_blobs {
796 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
797 }
798
799 store.delete_all(txs).unwrap();
800 store.cleanup();
801 }
802
803 #[test]
804 fn disk_get_exact_blobs_success() {
805 let (store, _dir) = tmp_store();
806
807 let blobs = rng_blobs(3);
808 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
809 store.insert_all(blobs.clone()).unwrap();
810
811 let retrieved_blobs = store.get_exact(txs).unwrap();
812 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
813 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
814 }
815 }
816
817 #[test]
818 fn disk_get_exact_blobs_failure() {
819 let (store, _dir) = tmp_store();
820
821 let blobs = rng_blobs(2);
822 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
823 store.insert_all(blobs).unwrap();
824
825 let missing_tx = TxHash::random();
827 let result = store.get_exact(vec![txs[0], missing_tx]);
828 assert!(result.is_err());
829 }
830
831 #[test]
832 fn disk_data_size_hint() {
833 let (store, _dir) = tmp_store();
834 assert_eq!(store.data_size_hint(), Some(0));
835
836 let blobs = rng_blobs(2);
837 store.insert_all(blobs).unwrap();
838 assert!(store.data_size_hint().unwrap() > 0);
839 }
840
841 #[test]
842 fn disk_cleanup_stat() {
843 let (store, _dir) = tmp_store();
844
845 let blobs = rng_blobs(3);
846 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
847 store.insert_all(blobs).unwrap();
848
849 store.delete_all(txs).unwrap();
850 let stat = store.cleanup();
851 assert_eq!(stat.delete_succeed, 3);
852 assert_eq!(stat.delete_failed, 0);
853 }
854}