reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
5use alloy_primitives::{TxHash, B256};
6use parking_lot::{Mutex, RwLock};
7use schnellru::{ByLength, LruMap};
8use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
9use tracing::{debug, trace};
10
11/// How many [`BlobTransactionSidecar`] to cache in memory.
12pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
13
14/// A blob store that stores blob data on disk.
15///
16/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
17/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
18/// blobs from disk.
19#[derive(Clone, Debug)]
20pub struct DiskFileBlobStore {
21    inner: Arc<DiskFileBlobStoreInner>,
22}
23
24impl DiskFileBlobStore {
25    /// Opens and initializes a new disk file blob store according to the given options.
26    pub fn open(
27        blob_dir: impl Into<PathBuf>,
28        opts: DiskFileBlobStoreConfig,
29    ) -> Result<Self, DiskFileBlobStoreError> {
30        let blob_dir = blob_dir.into();
31        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
32        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
33
34        // initialize the blob store
35        inner.delete_all()?;
36        inner.create_blob_dir()?;
37
38        Ok(Self { inner: Arc::new(inner) })
39    }
40
41    #[cfg(test)]
42    fn is_cached(&self, tx: &B256) -> bool {
43        self.inner.blob_cache.lock().get(tx).is_some()
44    }
45
46    #[cfg(test)]
47    fn clear_cache(&self) {
48        self.inner.blob_cache.lock().clear()
49    }
50}
51
52impl BlobStore for DiskFileBlobStore {
53    fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
54        self.inner.insert_one(tx, data)
55    }
56
57    fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
58        if txs.is_empty() {
59            return Ok(())
60        }
61        self.inner.insert_many(txs)
62    }
63
64    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
65        if self.inner.contains(tx)? {
66            self.inner.txs_to_delete.write().insert(tx);
67        }
68        Ok(())
69    }
70
71    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
72        let txs = self.inner.retain_existing(txs)?;
73        self.inner.txs_to_delete.write().extend(txs);
74        Ok(())
75    }
76
77    fn cleanup(&self) -> BlobStoreCleanupStat {
78        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
79        let mut stat = BlobStoreCleanupStat::default();
80        let mut subsize = 0;
81        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
82        for tx in txs_to_delete {
83            let path = self.inner.blob_disk_file(tx);
84            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
85            match fs::remove_file(&path) {
86                Ok(_) => {
87                    stat.delete_succeed += 1;
88                    subsize += filesize;
89                }
90                Err(e) => {
91                    stat.delete_failed += 1;
92                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
93                    debug!(target:"txpool::blob", %err);
94                }
95            };
96        }
97        self.inner.size_tracker.sub_size(subsize as usize);
98        self.inner.size_tracker.sub_len(stat.delete_succeed);
99        stat
100    }
101
102    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
103        self.inner.get_one(tx)
104    }
105
106    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
107        self.inner.contains(tx)
108    }
109
110    fn get_all(
111        &self,
112        txs: Vec<B256>,
113    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
114        if txs.is_empty() {
115            return Ok(Vec::new())
116        }
117        self.inner.get_all(txs)
118    }
119
120    fn get_exact(
121        &self,
122        txs: Vec<B256>,
123    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
124        if txs.is_empty() {
125            return Ok(Vec::new())
126        }
127        self.inner.get_exact(txs)
128    }
129
130    fn get_by_versioned_hashes(
131        &self,
132        versioned_hashes: &[B256],
133    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
134        let mut result = vec![None; versioned_hashes.len()];
135        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
136            for (hash_idx, match_result) in blob_sidecar.match_versioned_hashes(versioned_hashes) {
137                result[hash_idx] = Some(match_result);
138            }
139            // Return early if all blobs are found.
140            if result.iter().all(|blob| blob.is_some()) {
141                break;
142            }
143        }
144        Ok(result)
145    }
146
147    fn data_size_hint(&self) -> Option<usize> {
148        Some(self.inner.size_tracker.data_size())
149    }
150
151    fn blobs_len(&self) -> usize {
152        self.inner.size_tracker.blobs_len()
153    }
154}
155
156struct DiskFileBlobStoreInner {
157    blob_dir: PathBuf,
158    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
159    size_tracker: BlobStoreSize,
160    file_lock: RwLock<()>,
161    txs_to_delete: RwLock<HashSet<B256>>,
162}
163
164impl DiskFileBlobStoreInner {
165    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
166    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
167        Self {
168            blob_dir,
169            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
170            size_tracker: Default::default(),
171            file_lock: Default::default(),
172            txs_to_delete: Default::default(),
173        }
174    }
175
176    /// Creates the directory where blobs will be stored on disk.
177    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
178        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
179        fs::create_dir_all(&self.blob_dir)
180            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
181    }
182
183    /// Deletes the entire blob store.
184    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
185        match fs::remove_dir_all(&self.blob_dir) {
186            Ok(_) => {
187                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
188            }
189            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
190            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
191        }
192        Ok(())
193    }
194
195    /// Ensures blob is in the blob cache and written to the disk.
196    fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
197        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
198        data.rlp_encode_fields(&mut buf);
199        self.blob_cache.lock().insert(tx, Arc::new(data));
200        let size = self.write_one_encoded(tx, &buf)?;
201
202        self.size_tracker.add_size(size);
203        self.size_tracker.inc_len(1);
204        Ok(())
205    }
206
207    /// Ensures blobs are in the blob cache and written to the disk.
208    fn insert_many(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
209        let raw = txs
210            .iter()
211            .map(|(tx, data)| {
212                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
213                data.rlp_encode_fields(&mut buf);
214                (self.blob_disk_file(*tx), buf)
215            })
216            .collect::<Vec<_>>();
217
218        {
219            let mut cache = self.blob_cache.lock();
220            for (tx, data) in txs {
221                cache.insert(tx, Arc::new(data));
222            }
223        }
224        let mut add = 0;
225        let mut num = 0;
226        {
227            let _lock = self.file_lock.write();
228            for (path, data) in raw {
229                if path.exists() {
230                    debug!(target:"txpool::blob", ?path, "Blob already exists");
231                } else if let Err(err) = fs::write(&path, &data) {
232                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
233                } else {
234                    add += data.len();
235                    num += 1;
236                }
237            }
238        }
239        self.size_tracker.add_size(add);
240        self.size_tracker.inc_len(num);
241
242        Ok(())
243    }
244
245    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
246    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
247        if self.blob_cache.lock().get(&tx).is_some() {
248            return Ok(true)
249        }
250        // we only check if the file exists and assume it's valid
251        Ok(self.blob_disk_file(tx).is_file())
252    }
253
254    /// Returns all the blob transactions which are in the cache or on the disk.
255    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
256        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
257            let mut cache = self.blob_cache.lock();
258            txs.into_iter().partition(|tx| cache.get(tx).is_some())
259        };
260
261        let mut existing = in_cache;
262        for tx in not_in_cache {
263            if self.blob_disk_file(tx).is_file() {
264                existing.push(tx);
265            }
266        }
267
268        Ok(existing)
269    }
270
271    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
272    fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
273        if let Some(blob) = self.blob_cache.lock().get(&tx) {
274            return Ok(Some(blob.clone()))
275        }
276        let blob = self.read_one(tx)?;
277
278        if let Some(blob) = &blob {
279            let blob_arc = Arc::new(blob.clone());
280            self.blob_cache.lock().insert(tx, blob_arc.clone());
281            return Ok(Some(blob_arc))
282        }
283
284        Ok(None)
285    }
286
287    /// Returns the path to the blob file for the given transaction hash.
288    #[inline]
289    fn blob_disk_file(&self, tx: B256) -> PathBuf {
290        self.blob_dir.join(format!("{tx:x}"))
291    }
292
293    /// Retrieves the blob data for the given transaction hash.
294    #[inline]
295    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
296        let path = self.blob_disk_file(tx);
297        let data = {
298            let _lock = self.file_lock.read();
299            match fs::read(&path) {
300                Ok(data) => data,
301                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
302                Err(e) => {
303                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
304                        tx, path, e,
305                    ))))
306                }
307            }
308        };
309        BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
310            .map(Some)
311            .map_err(BlobStoreError::DecodeError)
312    }
313
314    /// Returns decoded blobs read from disk.
315    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecar)> {
316        self.read_many_raw(txs)
317            .into_iter()
318            .filter_map(|(tx, data)| {
319                BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
320                    .map(|sidecar| (tx, sidecar))
321                    .ok()
322            })
323            .collect()
324    }
325
326    /// Retrieves the raw blob data for the given transaction hashes.
327    ///
328    /// Only returns the blobs that were found on file.
329    #[inline]
330    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
331        let mut res = Vec::with_capacity(txs.len());
332        let _lock = self.file_lock.read();
333        for tx in txs {
334            let path = self.blob_disk_file(tx);
335            match fs::read(&path) {
336                Ok(data) => {
337                    res.push((tx, data));
338                }
339                Err(err) => {
340                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
341                }
342            };
343        }
344        res
345    }
346
347    /// Writes the blob data for the given transaction hash to the disk.
348    #[inline]
349    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
350        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
351        let mut add = 0;
352        let path = self.blob_disk_file(tx);
353        {
354            let _lock = self.file_lock.write();
355            if !path.exists() {
356                fs::write(&path, data)
357                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
358                add = data.len();
359            }
360        }
361        Ok(add)
362    }
363
364    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
365    ///
366    /// This will not return an error if there are missing blobs. Therefore, the result may be a
367    /// subset of the request or an empty vector if none of the blobs were found.
368    #[inline]
369    fn get_all(
370        &self,
371        txs: Vec<B256>,
372    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
373        let mut res = Vec::with_capacity(txs.len());
374        let mut cache_miss = Vec::new();
375        {
376            let mut cache = self.blob_cache.lock();
377            for tx in txs {
378                if let Some(blob) = cache.get(&tx) {
379                    res.push((tx, blob.clone()));
380                } else {
381                    cache_miss.push(tx)
382                }
383            }
384        }
385        if cache_miss.is_empty() {
386            return Ok(res)
387        }
388        let from_disk = self.read_many_decoded(cache_miss);
389        if from_disk.is_empty() {
390            return Ok(res)
391        }
392        let mut cache = self.blob_cache.lock();
393        for (tx, data) in from_disk {
394            let arc = Arc::new(data.clone());
395            cache.insert(tx, arc.clone());
396            res.push((tx, arc.clone()));
397        }
398
399        Ok(res)
400    }
401
402    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
403    ///
404    /// Returns an error if there are any missing blobs.
405    #[inline]
406    fn get_exact(
407        &self,
408        txs: Vec<B256>,
409    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
410        txs.into_iter()
411            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
412            .collect()
413    }
414}
415
416impl fmt::Debug for DiskFileBlobStoreInner {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        f.debug_struct("DiskFileBlobStoreInner")
419            .field("blob_dir", &self.blob_dir)
420            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
421            .field("txs_to_delete", &self.txs_to_delete.try_read())
422            .finish()
423    }
424}
425
426/// Errors that can occur when interacting with a disk file blob store.
427#[derive(Debug, thiserror::Error)]
428pub enum DiskFileBlobStoreError {
429    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
430    #[error("failed to open blobstore at {0}: {1}")]
431    /// Indicates a failure to open the blob store directory.
432    Open(PathBuf, io::Error),
433    /// Failure while reading a blob file.
434    #[error("[{0}] failed to read blob file at {1}: {2}")]
435    /// Indicates a failure while reading a blob file.
436    ReadFile(TxHash, PathBuf, io::Error),
437    /// Failure while writing a blob file.
438    #[error("[{0}] failed to write blob file at {1}: {2}")]
439    /// Indicates a failure while writing a blob file.
440    WriteFile(TxHash, PathBuf, io::Error),
441    /// Failure while deleting a blob file.
442    #[error("[{0}] failed to delete blob file at {1}: {2}")]
443    /// Indicates a failure while deleting a blob file.
444    DeleteFile(TxHash, PathBuf, io::Error),
445}
446
447impl From<DiskFileBlobStoreError> for BlobStoreError {
448    fn from(value: DiskFileBlobStoreError) -> Self {
449        Self::Other(Box::new(value))
450    }
451}
452
453/// Configuration for a disk file blob store.
454#[derive(Debug, Clone)]
455pub struct DiskFileBlobStoreConfig {
456    /// The maximum number of blobs to keep in the in memory blob cache.
457    pub max_cached_entries: u32,
458    /// How to open the blob store.
459    pub open: OpenDiskFileBlobStore,
460}
461
462impl Default for DiskFileBlobStoreConfig {
463    fn default() -> Self {
464        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
465    }
466}
467
468impl DiskFileBlobStoreConfig {
469    /// Set maximum number of blobs to keep in the in memory blob cache.
470    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
471        self.max_cached_entries = max_cached_entries;
472        self
473    }
474}
475
476/// How to open a disk file blob store.
477#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
478pub enum OpenDiskFileBlobStore {
479    /// Clear everything in the blob store.
480    #[default]
481    Clear,
482    /// Keep the existing blob store and index
483    ReIndex,
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use std::sync::atomic::Ordering;
490
491    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
492        let dir = tempfile::tempdir().unwrap();
493        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
494        (store, dir)
495    }
496
497    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecar)> {
498        let mut rng = rand::thread_rng();
499        (0..num)
500            .map(|_| {
501                let tx = TxHash::random_with(&mut rng);
502                let blob =
503                    BlobTransactionSidecar { blobs: vec![], commitments: vec![], proofs: vec![] };
504                (tx, blob)
505            })
506            .collect()
507    }
508
509    #[test]
510    fn disk_insert_all_get_all() {
511        let (store, _dir) = tmp_store();
512
513        let blobs = rng_blobs(10);
514        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
515        store.insert_all(blobs.clone()).unwrap();
516
517        // all cached
518        for (tx, blob) in &blobs {
519            assert!(store.is_cached(tx));
520            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
521            assert_eq!(b, *blob);
522        }
523
524        let all = store.get_all(all_hashes.clone()).unwrap();
525        for (tx, blob) in all {
526            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
527        }
528
529        assert!(store.contains(all_hashes[0]).unwrap());
530        store.delete_all(all_hashes.clone()).unwrap();
531        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
532        store.clear_cache();
533        store.cleanup();
534
535        assert!(store.get(blobs[0].0).unwrap().is_none());
536
537        let all = store.get_all(all_hashes.clone()).unwrap();
538        assert!(all.is_empty());
539
540        assert!(!store.contains(all_hashes[0]).unwrap());
541        assert!(store.get_exact(all_hashes).is_err());
542
543        assert_eq!(store.data_size_hint(), Some(0));
544        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
545    }
546
547    #[test]
548    fn disk_insert_and_retrieve() {
549        let (store, _dir) = tmp_store();
550
551        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
552        store.insert(tx, blob.clone()).unwrap();
553
554        assert!(store.is_cached(&tx));
555        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
556        assert_eq!(retrieved_blob, blob);
557    }
558
559    #[test]
560    fn disk_delete_blob() {
561        let (store, _dir) = tmp_store();
562
563        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
564        store.insert(tx, blob).unwrap();
565        assert!(store.is_cached(&tx));
566
567        store.delete(tx).unwrap();
568        assert!(store.inner.txs_to_delete.read().contains(&tx));
569        store.cleanup();
570
571        let result = store.get(tx).unwrap();
572        assert_eq!(
573            result,
574            Some(Arc::new(BlobTransactionSidecar {
575                blobs: vec![],
576                commitments: vec![],
577                proofs: vec![]
578            }))
579        );
580    }
581
582    #[test]
583    fn disk_insert_all_and_delete_all() {
584        let (store, _dir) = tmp_store();
585
586        let blobs = rng_blobs(5);
587        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
588        store.insert_all(blobs.clone()).unwrap();
589
590        for (tx, _) in &blobs {
591            assert!(store.is_cached(tx));
592        }
593
594        store.delete_all(txs.clone()).unwrap();
595        store.cleanup();
596
597        for tx in txs {
598            let result = store.get(tx).unwrap();
599            assert_eq!(
600                result,
601                Some(Arc::new(BlobTransactionSidecar {
602                    blobs: vec![],
603                    commitments: vec![],
604                    proofs: vec![]
605                }))
606            );
607        }
608    }
609
610    #[test]
611    fn disk_get_all_blobs() {
612        let (store, _dir) = tmp_store();
613
614        let blobs = rng_blobs(3);
615        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
616        store.insert_all(blobs.clone()).unwrap();
617
618        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
619        for (tx, blob) in retrieved_blobs {
620            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
621        }
622
623        store.delete_all(txs).unwrap();
624        store.cleanup();
625    }
626
627    #[test]
628    fn disk_get_exact_blobs_success() {
629        let (store, _dir) = tmp_store();
630
631        let blobs = rng_blobs(3);
632        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
633        store.insert_all(blobs.clone()).unwrap();
634
635        let retrieved_blobs = store.get_exact(txs).unwrap();
636        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
637            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
638        }
639    }
640
641    #[test]
642    fn disk_get_exact_blobs_failure() {
643        let (store, _dir) = tmp_store();
644
645        let blobs = rng_blobs(2);
646        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
647        store.insert_all(blobs).unwrap();
648
649        // Try to get a blob that was never inserted
650        let missing_tx = TxHash::random();
651        let result = store.get_exact(vec![txs[0], missing_tx]);
652        assert!(result.is_err());
653    }
654
655    #[test]
656    fn disk_data_size_hint() {
657        let (store, _dir) = tmp_store();
658        assert_eq!(store.data_size_hint(), Some(0));
659
660        let blobs = rng_blobs(2);
661        store.insert_all(blobs).unwrap();
662        assert!(store.data_size_hint().unwrap() > 0);
663    }
664
665    #[test]
666    fn disk_cleanup_stat() {
667        let (store, _dir) = tmp_store();
668
669        let blobs = rng_blobs(3);
670        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
671        store.insert_all(blobs).unwrap();
672
673        store.delete_all(txs).unwrap();
674        let stat = store.cleanup();
675        assert_eq!(stat.delete_succeed, 3);
676        assert_eq!(stat.delete_failed, 0);
677    }
678}