1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2},
6 eip7594::BlobTransactionSidecarVariant,
7};
8use alloy_primitives::{TxHash, B256};
9use parking_lot::{Mutex, RwLock};
10use schnellru::{ByLength, LruMap};
11use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
12use tracing::{debug, trace};
13
14pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
16
17#[derive(Clone, Debug)]
23pub struct DiskFileBlobStore {
24 inner: Arc<DiskFileBlobStoreInner>,
25}
26
27impl DiskFileBlobStore {
28 pub fn open(
30 blob_dir: impl Into<PathBuf>,
31 opts: DiskFileBlobStoreConfig,
32 ) -> Result<Self, DiskFileBlobStoreError> {
33 let blob_dir = blob_dir.into();
34 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
35 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
36
37 inner.delete_all()?;
39 inner.create_blob_dir()?;
40
41 Ok(Self { inner: Arc::new(inner) })
42 }
43
44 #[cfg(test)]
45 fn is_cached(&self, tx: &B256) -> bool {
46 self.inner.blob_cache.lock().get(tx).is_some()
47 }
48
49 #[cfg(test)]
50 fn clear_cache(&self) {
51 self.inner.blob_cache.lock().clear()
52 }
53}
54
55impl BlobStore for DiskFileBlobStore {
56 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
57 self.inner.insert_one(tx, data)
58 }
59
60 fn insert_all(
61 &self,
62 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
63 ) -> Result<(), BlobStoreError> {
64 if txs.is_empty() {
65 return Ok(())
66 }
67 self.inner.insert_many(txs)
68 }
69
70 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
71 if self.inner.contains(tx)? {
72 self.inner.txs_to_delete.write().insert(tx);
73 }
74 Ok(())
75 }
76
77 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
78 let txs = self.inner.retain_existing(txs)?;
79 self.inner.txs_to_delete.write().extend(txs);
80 Ok(())
81 }
82
83 fn cleanup(&self) -> BlobStoreCleanupStat {
84 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
85 let mut stat = BlobStoreCleanupStat::default();
86 let mut subsize = 0;
87 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
88 for tx in txs_to_delete {
89 let path = self.inner.blob_disk_file(tx);
90 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
91 match fs::remove_file(&path) {
92 Ok(_) => {
93 stat.delete_succeed += 1;
94 subsize += filesize;
95 }
96 Err(e) => {
97 stat.delete_failed += 1;
98 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
99 debug!(target:"txpool::blob", %err);
100 }
101 };
102 }
103 self.inner.size_tracker.sub_size(subsize as usize);
104 self.inner.size_tracker.sub_len(stat.delete_succeed);
105 stat
106 }
107
108 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
109 self.inner.get_one(tx)
110 }
111
112 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
113 self.inner.contains(tx)
114 }
115
116 fn get_all(
117 &self,
118 txs: Vec<B256>,
119 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
120 if txs.is_empty() {
121 return Ok(Vec::new())
122 }
123 self.inner.get_all(txs)
124 }
125
126 fn get_exact(
127 &self,
128 txs: Vec<B256>,
129 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
130 if txs.is_empty() {
131 return Ok(Vec::new())
132 }
133 self.inner.get_exact(txs)
134 }
135
136 fn get_by_versioned_hashes_v1(
137 &self,
138 versioned_hashes: &[B256],
139 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
140 let mut result = vec![None; versioned_hashes.len()];
142
143 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
145 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
146 for (hash_idx, match_result) in
147 blob_sidecar.match_versioned_hashes(versioned_hashes)
148 {
149 result[hash_idx] = Some(match_result);
150 }
151 }
152
153 if result.iter().all(|blob| blob.is_some()) {
155 return Ok(result);
156 }
157 }
158
159 let mut missing_tx_hashes = Vec::new();
162
163 {
164 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
165 for (idx, _) in
166 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
167 {
168 let versioned_hash = versioned_hashes[idx];
170 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
171 missing_tx_hashes.push(tx_hash);
172 }
173 }
174 }
175
176 if !missing_tx_hashes.is_empty() {
178 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
179 for (_, blob_sidecar) in blobs_from_disk {
180 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
181 for (hash_idx, match_result) in
182 blob_sidecar.match_versioned_hashes(versioned_hashes)
183 {
184 if result[hash_idx].is_none() {
185 result[hash_idx] = Some(match_result);
186 }
187 }
188 }
189 }
190 }
191
192 Ok(result)
193 }
194
195 fn get_by_versioned_hashes_v2(
196 &self,
197 versioned_hashes: &[B256],
198 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
199 let mut result = vec![None; versioned_hashes.len()];
202
203 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
205 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
206 for (hash_idx, match_result) in
207 blob_sidecar.match_versioned_hashes(versioned_hashes)
208 {
209 result[hash_idx] = Some(match_result);
210 }
211 }
212
213 if result.iter().all(|blob| blob.is_some()) {
215 return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
217 }
218 }
219
220 let mut missing_tx_hashes = Vec::new();
222
223 {
224 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
225 for (idx, _) in
226 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
227 {
228 let versioned_hash = versioned_hashes[idx];
230 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
231 missing_tx_hashes.push(tx_hash);
232 }
233 }
234 }
235
236 if !missing_tx_hashes.is_empty() {
238 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
239 for (_, blob_sidecar) in blobs_from_disk {
240 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
241 for (hash_idx, match_result) in
242 blob_sidecar.match_versioned_hashes(versioned_hashes)
243 {
244 if result[hash_idx].is_none() {
245 result[hash_idx] = Some(match_result);
246 }
247 }
248 }
249 }
250 }
251
252 if result.iter().all(|blob| blob.is_some()) {
254 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
255 } else {
256 Ok(None)
257 }
258 }
259
260 fn data_size_hint(&self) -> Option<usize> {
261 Some(self.inner.size_tracker.data_size())
262 }
263
264 fn blobs_len(&self) -> usize {
265 self.inner.size_tracker.blobs_len()
266 }
267}
268
269struct DiskFileBlobStoreInner {
270 blob_dir: PathBuf,
271 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
272 size_tracker: BlobStoreSize,
273 file_lock: RwLock<()>,
274 txs_to_delete: RwLock<HashSet<B256>>,
275 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
280}
281
282impl DiskFileBlobStoreInner {
283 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
285 Self {
286 blob_dir,
287 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
288 size_tracker: Default::default(),
289 file_lock: Default::default(),
290 txs_to_delete: Default::default(),
291 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(max_length * 6))),
292 }
293 }
294
295 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
297 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
298 fs::create_dir_all(&self.blob_dir)
299 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
300 }
301
302 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
304 match fs::remove_dir_all(&self.blob_dir) {
305 Ok(_) => {
306 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
307 }
308 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
309 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
310 }
311 Ok(())
312 }
313
314 fn insert_one(
316 &self,
317 tx: B256,
318 data: BlobTransactionSidecarVariant,
319 ) -> Result<(), BlobStoreError> {
320 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
321 data.rlp_encode_fields(&mut buf);
322
323 {
324 let mut map = self.versioned_hashes_to_txhash.lock();
326 data.versioned_hashes().for_each(|hash| {
327 map.insert(hash, tx);
328 });
329 }
330
331 self.blob_cache.lock().insert(tx, Arc::new(data));
332
333 let size = self.write_one_encoded(tx, &buf)?;
334
335 self.size_tracker.add_size(size);
336 self.size_tracker.inc_len(1);
337 Ok(())
338 }
339
340 fn insert_many(
342 &self,
343 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
344 ) -> Result<(), BlobStoreError> {
345 let raw = txs
346 .iter()
347 .map(|(tx, data)| {
348 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
349 data.rlp_encode_fields(&mut buf);
350 (self.blob_disk_file(*tx), buf)
351 })
352 .collect::<Vec<_>>();
353
354 {
355 let mut map = self.versioned_hashes_to_txhash.lock();
357 for (tx, data) in &txs {
358 data.versioned_hashes().for_each(|hash| {
359 map.insert(hash, *tx);
360 });
361 }
362 }
363
364 {
365 let mut cache = self.blob_cache.lock();
367 for (tx, data) in txs {
368 cache.insert(tx, Arc::new(data));
369 }
370 }
371
372 let mut add = 0;
373 let mut num = 0;
374 {
375 let _lock = self.file_lock.write();
376 for (path, data) in raw {
377 if path.exists() {
378 debug!(target:"txpool::blob", ?path, "Blob already exists");
379 } else if let Err(err) = fs::write(&path, &data) {
380 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
381 } else {
382 add += data.len();
383 num += 1;
384 }
385 }
386 }
387 self.size_tracker.add_size(add);
388 self.size_tracker.inc_len(num);
389
390 Ok(())
391 }
392
393 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
395 if self.blob_cache.lock().get(&tx).is_some() {
396 return Ok(true)
397 }
398 Ok(self.blob_disk_file(tx).is_file())
400 }
401
402 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
404 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
405 let mut cache = self.blob_cache.lock();
406 txs.into_iter().partition(|tx| cache.get(tx).is_some())
407 };
408
409 let mut existing = in_cache;
410 for tx in not_in_cache {
411 if self.blob_disk_file(tx).is_file() {
412 existing.push(tx);
413 }
414 }
415
416 Ok(existing)
417 }
418
419 fn get_one(
421 &self,
422 tx: B256,
423 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
424 if let Some(blob) = self.blob_cache.lock().get(&tx) {
425 return Ok(Some(blob.clone()))
426 }
427
428 if let Some(blob) = self.read_one(tx)? {
429 let blob_arc = Arc::new(blob);
430 self.blob_cache.lock().insert(tx, blob_arc.clone());
431 return Ok(Some(blob_arc))
432 }
433
434 Ok(None)
435 }
436
437 #[inline]
439 fn blob_disk_file(&self, tx: B256) -> PathBuf {
440 self.blob_dir.join(format!("{tx:x}"))
441 }
442
443 #[inline]
445 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
446 let path = self.blob_disk_file(tx);
447 let data = {
448 let _lock = self.file_lock.read();
449 match fs::read(&path) {
450 Ok(data) => data,
451 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
452 Err(e) => {
453 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
454 tx, path, e,
455 ))))
456 }
457 }
458 };
459 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
460 .map(Some)
461 .map_err(BlobStoreError::DecodeError)
462 }
463
464 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
468 self.read_many_raw(txs)
469 .into_iter()
470 .filter_map(|(tx, data)| {
471 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
472 .map(|sidecar| (tx, sidecar))
473 .ok()
474 })
475 .collect()
476 }
477
478 #[inline]
482 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
483 let mut res = Vec::with_capacity(txs.len());
484 let _lock = self.file_lock.read();
485 for tx in txs {
486 let path = self.blob_disk_file(tx);
487 match fs::read(&path) {
488 Ok(data) => {
489 res.push((tx, data));
490 }
491 Err(err) => {
492 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
493 }
494 };
495 }
496 res
497 }
498
499 #[inline]
501 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
502 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
503 let mut add = 0;
504 let path = self.blob_disk_file(tx);
505 {
506 let _lock = self.file_lock.write();
507 if !path.exists() {
508 fs::write(&path, data)
509 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
510 add = data.len();
511 }
512 }
513 Ok(add)
514 }
515
516 #[inline]
521 fn get_all(
522 &self,
523 txs: Vec<B256>,
524 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
525 let mut res = Vec::with_capacity(txs.len());
526 let mut cache_miss = Vec::new();
527 {
528 let mut cache = self.blob_cache.lock();
529 for tx in txs {
530 if let Some(blob) = cache.get(&tx) {
531 res.push((tx, blob.clone()));
532 } else {
533 cache_miss.push(tx)
534 }
535 }
536 }
537 if cache_miss.is_empty() {
538 return Ok(res)
539 }
540 let from_disk = self.read_many_decoded(cache_miss);
541 if from_disk.is_empty() {
542 return Ok(res)
543 }
544 let from_disk = from_disk
545 .into_iter()
546 .map(|(tx, data)| {
547 let data = Arc::new(data);
548 res.push((tx, data.clone()));
549 (tx, data)
550 })
551 .collect::<Vec<_>>();
552
553 let mut cache = self.blob_cache.lock();
554 for (tx, data) in from_disk {
555 cache.insert(tx, data);
556 }
557
558 Ok(res)
559 }
560
561 #[inline]
565 fn get_exact(
566 &self,
567 txs: Vec<B256>,
568 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
569 txs.into_iter()
570 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
571 .collect()
572 }
573}
574
575impl fmt::Debug for DiskFileBlobStoreInner {
576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577 f.debug_struct("DiskFileBlobStoreInner")
578 .field("blob_dir", &self.blob_dir)
579 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
580 .field("txs_to_delete", &self.txs_to_delete.try_read())
581 .finish()
582 }
583}
584
585#[derive(Debug, thiserror::Error)]
587pub enum DiskFileBlobStoreError {
588 #[error("failed to open blobstore at {0}: {1}")]
590 Open(PathBuf, io::Error),
592 #[error("[{0}] failed to read blob file at {1}: {2}")]
594 ReadFile(TxHash, PathBuf, io::Error),
596 #[error("[{0}] failed to write blob file at {1}: {2}")]
598 WriteFile(TxHash, PathBuf, io::Error),
600 #[error("[{0}] failed to delete blob file at {1}: {2}")]
602 DeleteFile(TxHash, PathBuf, io::Error),
604}
605
606impl From<DiskFileBlobStoreError> for BlobStoreError {
607 fn from(value: DiskFileBlobStoreError) -> Self {
608 Self::Other(Box::new(value))
609 }
610}
611
612#[derive(Debug, Clone)]
614pub struct DiskFileBlobStoreConfig {
615 pub max_cached_entries: u32,
617 pub open: OpenDiskFileBlobStore,
619}
620
621impl Default for DiskFileBlobStoreConfig {
622 fn default() -> Self {
623 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
624 }
625}
626
627impl DiskFileBlobStoreConfig {
628 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
630 self.max_cached_entries = max_cached_entries;
631 self
632 }
633}
634
635#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
637pub enum OpenDiskFileBlobStore {
638 #[default]
640 Clear,
641 ReIndex,
643}
644
645#[cfg(test)]
646mod tests {
647 use alloy_consensus::BlobTransactionSidecar;
648 use alloy_eips::eip7594::BlobTransactionSidecarVariant;
649
650 use super::*;
651 use std::sync::atomic::Ordering;
652
653 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
654 let dir = tempfile::tempdir().unwrap();
655 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
656 (store, dir)
657 }
658
659 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
660 let mut rng = rand::rng();
661 (0..num)
662 .map(|_| {
663 let tx = TxHash::random_with(&mut rng);
664 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
665 blobs: vec![],
666 commitments: vec![],
667 proofs: vec![],
668 });
669 (tx, blob)
670 })
671 .collect()
672 }
673
674 #[test]
675 fn disk_insert_all_get_all() {
676 let (store, _dir) = tmp_store();
677
678 let blobs = rng_blobs(10);
679 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
680 store.insert_all(blobs.clone()).unwrap();
681
682 for (tx, blob) in &blobs {
684 assert!(store.is_cached(tx));
685 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
686 assert_eq!(b, *blob);
687 }
688
689 let all = store.get_all(all_hashes.clone()).unwrap();
690 for (tx, blob) in all {
691 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
692 }
693
694 assert!(store.contains(all_hashes[0]).unwrap());
695 store.delete_all(all_hashes.clone()).unwrap();
696 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
697 store.clear_cache();
698 store.cleanup();
699
700 assert!(store.get(blobs[0].0).unwrap().is_none());
701
702 let all = store.get_all(all_hashes.clone()).unwrap();
703 assert!(all.is_empty());
704
705 assert!(!store.contains(all_hashes[0]).unwrap());
706 assert!(store.get_exact(all_hashes).is_err());
707
708 assert_eq!(store.data_size_hint(), Some(0));
709 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
710 }
711
712 #[test]
713 fn disk_insert_and_retrieve() {
714 let (store, _dir) = tmp_store();
715
716 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
717 store.insert(tx, blob.clone()).unwrap();
718
719 assert!(store.is_cached(&tx));
720 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
721 assert_eq!(retrieved_blob, blob);
722 }
723
724 #[test]
725 fn disk_delete_blob() {
726 let (store, _dir) = tmp_store();
727
728 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
729 store.insert(tx, blob).unwrap();
730 assert!(store.is_cached(&tx));
731
732 store.delete(tx).unwrap();
733 assert!(store.inner.txs_to_delete.read().contains(&tx));
734 store.cleanup();
735
736 let result = store.get(tx).unwrap();
737 assert_eq!(
738 result,
739 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
740 blobs: vec![],
741 commitments: vec![],
742 proofs: vec![]
743 })))
744 );
745 }
746
747 #[test]
748 fn disk_insert_all_and_delete_all() {
749 let (store, _dir) = tmp_store();
750
751 let blobs = rng_blobs(5);
752 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
753 store.insert_all(blobs.clone()).unwrap();
754
755 for (tx, _) in &blobs {
756 assert!(store.is_cached(tx));
757 }
758
759 store.delete_all(txs.clone()).unwrap();
760 store.cleanup();
761
762 for tx in txs {
763 let result = store.get(tx).unwrap();
764 assert_eq!(
765 result,
766 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
767 blobs: vec![],
768 commitments: vec![],
769 proofs: vec![]
770 })))
771 );
772 }
773 }
774
775 #[test]
776 fn disk_get_all_blobs() {
777 let (store, _dir) = tmp_store();
778
779 let blobs = rng_blobs(3);
780 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
781 store.insert_all(blobs.clone()).unwrap();
782
783 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
784 for (tx, blob) in retrieved_blobs {
785 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
786 }
787
788 store.delete_all(txs).unwrap();
789 store.cleanup();
790 }
791
792 #[test]
793 fn disk_get_exact_blobs_success() {
794 let (store, _dir) = tmp_store();
795
796 let blobs = rng_blobs(3);
797 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
798 store.insert_all(blobs.clone()).unwrap();
799
800 let retrieved_blobs = store.get_exact(txs).unwrap();
801 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
802 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
803 }
804 }
805
806 #[test]
807 fn disk_get_exact_blobs_failure() {
808 let (store, _dir) = tmp_store();
809
810 let blobs = rng_blobs(2);
811 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
812 store.insert_all(blobs).unwrap();
813
814 let missing_tx = TxHash::random();
816 let result = store.get_exact(vec![txs[0], missing_tx]);
817 assert!(result.is_err());
818 }
819
820 #[test]
821 fn disk_data_size_hint() {
822 let (store, _dir) = tmp_store();
823 assert_eq!(store.data_size_hint(), Some(0));
824
825 let blobs = rng_blobs(2);
826 store.insert_all(blobs).unwrap();
827 assert!(store.data_size_hint().unwrap() > 0);
828 }
829
830 #[test]
831 fn disk_cleanup_stat() {
832 let (store, _dir) = tmp_store();
833
834 let blobs = rng_blobs(3);
835 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
836 store.insert_all(blobs).unwrap();
837
838 store.delete_all(txs).unwrap();
839 let stat = store.cleanup();
840 assert_eq!(stat.delete_succeed, 3);
841 assert_eq!(stat.delete_failed, 0);
842 }
843}