reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
5use alloy_primitives::{TxHash, B256};
6use parking_lot::{Mutex, RwLock};
7use schnellru::{ByLength, LruMap};
8use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
9use tracing::{debug, trace};
10
11/// How many [`BlobTransactionSidecar`] to cache in memory.
12pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
13
14/// A blob store that stores blob data on disk.
15///
16/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
17/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
18/// blobs from disk.
19#[derive(Clone, Debug)]
20pub struct DiskFileBlobStore {
21    inner: Arc<DiskFileBlobStoreInner>,
22}
23
24impl DiskFileBlobStore {
25    /// Opens and initializes a new disk file blob store according to the given options.
26    pub fn open(
27        blob_dir: impl Into<PathBuf>,
28        opts: DiskFileBlobStoreConfig,
29    ) -> Result<Self, DiskFileBlobStoreError> {
30        let blob_dir = blob_dir.into();
31        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
32        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
33
34        // initialize the blob store
35        inner.delete_all()?;
36        inner.create_blob_dir()?;
37
38        Ok(Self { inner: Arc::new(inner) })
39    }
40
41    #[cfg(test)]
42    fn is_cached(&self, tx: &B256) -> bool {
43        self.inner.blob_cache.lock().get(tx).is_some()
44    }
45
46    #[cfg(test)]
47    fn clear_cache(&self) {
48        self.inner.blob_cache.lock().clear()
49    }
50}
51
52impl BlobStore for DiskFileBlobStore {
53    fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
54        self.inner.insert_one(tx, data)
55    }
56
57    fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
58        if txs.is_empty() {
59            return Ok(())
60        }
61        self.inner.insert_many(txs)
62    }
63
64    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
65        if self.inner.contains(tx)? {
66            self.inner.txs_to_delete.write().insert(tx);
67        }
68        Ok(())
69    }
70
71    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
72        let txs = self.inner.retain_existing(txs)?;
73        self.inner.txs_to_delete.write().extend(txs);
74        Ok(())
75    }
76
77    fn cleanup(&self) -> BlobStoreCleanupStat {
78        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
79        let mut stat = BlobStoreCleanupStat::default();
80        let mut subsize = 0;
81        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
82        for tx in txs_to_delete {
83            let path = self.inner.blob_disk_file(tx);
84            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
85            match fs::remove_file(&path) {
86                Ok(_) => {
87                    stat.delete_succeed += 1;
88                    subsize += filesize;
89                }
90                Err(e) => {
91                    stat.delete_failed += 1;
92                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
93                    debug!(target:"txpool::blob", %err);
94                }
95            };
96        }
97        self.inner.size_tracker.sub_size(subsize as usize);
98        self.inner.size_tracker.sub_len(stat.delete_succeed);
99        stat
100    }
101
102    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
103        self.inner.get_one(tx)
104    }
105
106    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
107        self.inner.contains(tx)
108    }
109
110    fn get_all(
111        &self,
112        txs: Vec<B256>,
113    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
114        if txs.is_empty() {
115            return Ok(Vec::new())
116        }
117        self.inner.get_all(txs)
118    }
119
120    fn get_exact(
121        &self,
122        txs: Vec<B256>,
123    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
124        if txs.is_empty() {
125            return Ok(Vec::new())
126        }
127        self.inner.get_exact(txs)
128    }
129
130    fn get_by_versioned_hashes(
131        &self,
132        versioned_hashes: &[B256],
133    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
134        // the response must always be the same len as the request, misses must be None
135        let mut result = vec![None; versioned_hashes.len()];
136
137        // first scan all cached full sidecars
138        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
139            for (hash_idx, match_result) in blob_sidecar.match_versioned_hashes(versioned_hashes) {
140                result[hash_idx] = Some(match_result);
141            }
142
143            // return early if all blobs are found.
144            if result.iter().all(|blob| blob.is_some()) {
145                return Ok(result);
146            }
147        }
148
149        // not all versioned hashes were be found, try to look up a matching tx
150
151        let mut missing_tx_hashes = Vec::new();
152
153        {
154            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
155            for (idx, _) in
156                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
157            {
158                // this is safe because the result vec has the same len
159                let versioned_hash = versioned_hashes[idx];
160                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
161                    missing_tx_hashes.push(tx_hash);
162                }
163            }
164        }
165
166        // if we have missing blobs, try to read them from disk and try again
167        if !missing_tx_hashes.is_empty() {
168            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
169            for (_, blob_sidecar) in blobs_from_disk {
170                for (hash_idx, match_result) in
171                    blob_sidecar.match_versioned_hashes(versioned_hashes)
172                {
173                    if result[hash_idx].is_none() {
174                        result[hash_idx] = Some(match_result);
175                    }
176                }
177            }
178        }
179
180        Ok(result)
181    }
182
183    fn data_size_hint(&self) -> Option<usize> {
184        Some(self.inner.size_tracker.data_size())
185    }
186
187    fn blobs_len(&self) -> usize {
188        self.inner.size_tracker.blobs_len()
189    }
190}
191
192struct DiskFileBlobStoreInner {
193    blob_dir: PathBuf,
194    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
195    size_tracker: BlobStoreSize,
196    file_lock: RwLock<()>,
197    txs_to_delete: RwLock<HashSet<B256>>,
198    /// Tracks of known versioned hashes and a transaction they exist in
199    ///
200    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
201    /// the most recent one.
202    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
203}
204
205impl DiskFileBlobStoreInner {
206    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
207    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
208        Self {
209            blob_dir,
210            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
211            size_tracker: Default::default(),
212            file_lock: Default::default(),
213            txs_to_delete: Default::default(),
214            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(max_length * 6))),
215        }
216    }
217
218    /// Creates the directory where blobs will be stored on disk.
219    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
220        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
221        fs::create_dir_all(&self.blob_dir)
222            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
223    }
224
225    /// Deletes the entire blob store.
226    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
227        match fs::remove_dir_all(&self.blob_dir) {
228            Ok(_) => {
229                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
230            }
231            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
232            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
233        }
234        Ok(())
235    }
236
237    /// Ensures blob is in the blob cache and written to the disk.
238    fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
239        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
240        data.rlp_encode_fields(&mut buf);
241
242        {
243            // cache the versioned hashes to tx hash
244            let mut map = self.versioned_hashes_to_txhash.lock();
245            data.versioned_hashes().for_each(|hash| {
246                map.insert(hash, tx);
247            })
248        }
249
250        self.blob_cache.lock().insert(tx, Arc::new(data));
251
252        let size = self.write_one_encoded(tx, &buf)?;
253
254        self.size_tracker.add_size(size);
255        self.size_tracker.inc_len(1);
256        Ok(())
257    }
258
259    /// Ensures blobs are in the blob cache and written to the disk.
260    fn insert_many(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
261        let raw = txs
262            .iter()
263            .map(|(tx, data)| {
264                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
265                data.rlp_encode_fields(&mut buf);
266                (self.blob_disk_file(*tx), buf)
267            })
268            .collect::<Vec<_>>();
269
270        {
271            // cache versioned hashes to tx hash
272            let mut map = self.versioned_hashes_to_txhash.lock();
273            for (tx, data) in &txs {
274                data.versioned_hashes().for_each(|hash| {
275                    map.insert(hash, *tx);
276                })
277            }
278        }
279
280        {
281            // cache blobs
282            let mut cache = self.blob_cache.lock();
283            for (tx, data) in txs {
284                cache.insert(tx, Arc::new(data));
285            }
286        }
287
288        let mut add = 0;
289        let mut num = 0;
290        {
291            let _lock = self.file_lock.write();
292            for (path, data) in raw {
293                if path.exists() {
294                    debug!(target:"txpool::blob", ?path, "Blob already exists");
295                } else if let Err(err) = fs::write(&path, &data) {
296                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
297                } else {
298                    add += data.len();
299                    num += 1;
300                }
301            }
302        }
303        self.size_tracker.add_size(add);
304        self.size_tracker.inc_len(num);
305
306        Ok(())
307    }
308
309    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
310    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
311        if self.blob_cache.lock().get(&tx).is_some() {
312            return Ok(true)
313        }
314        // we only check if the file exists and assume it's valid
315        Ok(self.blob_disk_file(tx).is_file())
316    }
317
318    /// Returns all the blob transactions which are in the cache or on the disk.
319    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
320        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
321            let mut cache = self.blob_cache.lock();
322            txs.into_iter().partition(|tx| cache.get(tx).is_some())
323        };
324
325        let mut existing = in_cache;
326        for tx in not_in_cache {
327            if self.blob_disk_file(tx).is_file() {
328                existing.push(tx);
329            }
330        }
331
332        Ok(existing)
333    }
334
335    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
336    fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
337        if let Some(blob) = self.blob_cache.lock().get(&tx) {
338            return Ok(Some(blob.clone()))
339        }
340        let blob = self.read_one(tx)?;
341
342        if let Some(blob) = &blob {
343            let blob_arc = Arc::new(blob.clone());
344            self.blob_cache.lock().insert(tx, blob_arc.clone());
345            return Ok(Some(blob_arc))
346        }
347
348        Ok(None)
349    }
350
351    /// Returns the path to the blob file for the given transaction hash.
352    #[inline]
353    fn blob_disk_file(&self, tx: B256) -> PathBuf {
354        self.blob_dir.join(format!("{tx:x}"))
355    }
356
357    /// Retrieves the blob data for the given transaction hash.
358    #[inline]
359    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
360        let path = self.blob_disk_file(tx);
361        let data = {
362            let _lock = self.file_lock.read();
363            match fs::read(&path) {
364                Ok(data) => data,
365                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
366                Err(e) => {
367                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
368                        tx, path, e,
369                    ))))
370                }
371            }
372        };
373        BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
374            .map(Some)
375            .map_err(BlobStoreError::DecodeError)
376    }
377
378    /// Returns decoded blobs read from disk.
379    ///
380    /// Only returns sidecars that were found and successfully decoded.
381    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecar)> {
382        self.read_many_raw(txs)
383            .into_iter()
384            .filter_map(|(tx, data)| {
385                BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
386                    .map(|sidecar| (tx, sidecar))
387                    .ok()
388            })
389            .collect()
390    }
391
392    /// Retrieves the raw blob data for the given transaction hashes.
393    ///
394    /// Only returns the blobs that were found on file.
395    #[inline]
396    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
397        let mut res = Vec::with_capacity(txs.len());
398        let _lock = self.file_lock.read();
399        for tx in txs {
400            let path = self.blob_disk_file(tx);
401            match fs::read(&path) {
402                Ok(data) => {
403                    res.push((tx, data));
404                }
405                Err(err) => {
406                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
407                }
408            };
409        }
410        res
411    }
412
413    /// Writes the blob data for the given transaction hash to the disk.
414    #[inline]
415    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
416        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
417        let mut add = 0;
418        let path = self.blob_disk_file(tx);
419        {
420            let _lock = self.file_lock.write();
421            if !path.exists() {
422                fs::write(&path, data)
423                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
424                add = data.len();
425            }
426        }
427        Ok(add)
428    }
429
430    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
431    ///
432    /// This will not return an error if there are missing blobs. Therefore, the result may be a
433    /// subset of the request or an empty vector if none of the blobs were found.
434    #[inline]
435    fn get_all(
436        &self,
437        txs: Vec<B256>,
438    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
439        let mut res = Vec::with_capacity(txs.len());
440        let mut cache_miss = Vec::new();
441        {
442            let mut cache = self.blob_cache.lock();
443            for tx in txs {
444                if let Some(blob) = cache.get(&tx) {
445                    res.push((tx, blob.clone()));
446                } else {
447                    cache_miss.push(tx)
448                }
449            }
450        }
451        if cache_miss.is_empty() {
452            return Ok(res)
453        }
454        let from_disk = self.read_many_decoded(cache_miss);
455        if from_disk.is_empty() {
456            return Ok(res)
457        }
458        let mut cache = self.blob_cache.lock();
459        for (tx, data) in from_disk {
460            let arc = Arc::new(data.clone());
461            cache.insert(tx, arc.clone());
462            res.push((tx, arc.clone()));
463        }
464
465        Ok(res)
466    }
467
468    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
469    ///
470    /// Returns an error if there are any missing blobs.
471    #[inline]
472    fn get_exact(
473        &self,
474        txs: Vec<B256>,
475    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
476        txs.into_iter()
477            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
478            .collect()
479    }
480}
481
482impl fmt::Debug for DiskFileBlobStoreInner {
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        f.debug_struct("DiskFileBlobStoreInner")
485            .field("blob_dir", &self.blob_dir)
486            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
487            .field("txs_to_delete", &self.txs_to_delete.try_read())
488            .finish()
489    }
490}
491
492/// Errors that can occur when interacting with a disk file blob store.
493#[derive(Debug, thiserror::Error)]
494pub enum DiskFileBlobStoreError {
495    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
496    #[error("failed to open blobstore at {0}: {1}")]
497    /// Indicates a failure to open the blob store directory.
498    Open(PathBuf, io::Error),
499    /// Failure while reading a blob file.
500    #[error("[{0}] failed to read blob file at {1}: {2}")]
501    /// Indicates a failure while reading a blob file.
502    ReadFile(TxHash, PathBuf, io::Error),
503    /// Failure while writing a blob file.
504    #[error("[{0}] failed to write blob file at {1}: {2}")]
505    /// Indicates a failure while writing a blob file.
506    WriteFile(TxHash, PathBuf, io::Error),
507    /// Failure while deleting a blob file.
508    #[error("[{0}] failed to delete blob file at {1}: {2}")]
509    /// Indicates a failure while deleting a blob file.
510    DeleteFile(TxHash, PathBuf, io::Error),
511}
512
513impl From<DiskFileBlobStoreError> for BlobStoreError {
514    fn from(value: DiskFileBlobStoreError) -> Self {
515        Self::Other(Box::new(value))
516    }
517}
518
519/// Configuration for a disk file blob store.
520#[derive(Debug, Clone)]
521pub struct DiskFileBlobStoreConfig {
522    /// The maximum number of blobs to keep in the in memory blob cache.
523    pub max_cached_entries: u32,
524    /// How to open the blob store.
525    pub open: OpenDiskFileBlobStore,
526}
527
528impl Default for DiskFileBlobStoreConfig {
529    fn default() -> Self {
530        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
531    }
532}
533
534impl DiskFileBlobStoreConfig {
535    /// Set maximum number of blobs to keep in the in memory blob cache.
536    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
537        self.max_cached_entries = max_cached_entries;
538        self
539    }
540}
541
542/// How to open a disk file blob store.
543#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
544pub enum OpenDiskFileBlobStore {
545    /// Clear everything in the blob store.
546    #[default]
547    Clear,
548    /// Keep the existing blob store and index
549    ReIndex,
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use std::sync::atomic::Ordering;
556
557    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
558        let dir = tempfile::tempdir().unwrap();
559        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
560        (store, dir)
561    }
562
563    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecar)> {
564        let mut rng = rand::rng();
565        (0..num)
566            .map(|_| {
567                let tx = TxHash::random_with(&mut rng);
568                let blob =
569                    BlobTransactionSidecar { blobs: vec![], commitments: vec![], proofs: vec![] };
570                (tx, blob)
571            })
572            .collect()
573    }
574
575    #[test]
576    fn disk_insert_all_get_all() {
577        let (store, _dir) = tmp_store();
578
579        let blobs = rng_blobs(10);
580        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
581        store.insert_all(blobs.clone()).unwrap();
582
583        // all cached
584        for (tx, blob) in &blobs {
585            assert!(store.is_cached(tx));
586            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
587            assert_eq!(b, *blob);
588        }
589
590        let all = store.get_all(all_hashes.clone()).unwrap();
591        for (tx, blob) in all {
592            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
593        }
594
595        assert!(store.contains(all_hashes[0]).unwrap());
596        store.delete_all(all_hashes.clone()).unwrap();
597        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
598        store.clear_cache();
599        store.cleanup();
600
601        assert!(store.get(blobs[0].0).unwrap().is_none());
602
603        let all = store.get_all(all_hashes.clone()).unwrap();
604        assert!(all.is_empty());
605
606        assert!(!store.contains(all_hashes[0]).unwrap());
607        assert!(store.get_exact(all_hashes).is_err());
608
609        assert_eq!(store.data_size_hint(), Some(0));
610        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
611    }
612
613    #[test]
614    fn disk_insert_and_retrieve() {
615        let (store, _dir) = tmp_store();
616
617        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
618        store.insert(tx, blob.clone()).unwrap();
619
620        assert!(store.is_cached(&tx));
621        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
622        assert_eq!(retrieved_blob, blob);
623    }
624
625    #[test]
626    fn disk_delete_blob() {
627        let (store, _dir) = tmp_store();
628
629        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
630        store.insert(tx, blob).unwrap();
631        assert!(store.is_cached(&tx));
632
633        store.delete(tx).unwrap();
634        assert!(store.inner.txs_to_delete.read().contains(&tx));
635        store.cleanup();
636
637        let result = store.get(tx).unwrap();
638        assert_eq!(
639            result,
640            Some(Arc::new(BlobTransactionSidecar {
641                blobs: vec![],
642                commitments: vec![],
643                proofs: vec![]
644            }))
645        );
646    }
647
648    #[test]
649    fn disk_insert_all_and_delete_all() {
650        let (store, _dir) = tmp_store();
651
652        let blobs = rng_blobs(5);
653        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
654        store.insert_all(blobs.clone()).unwrap();
655
656        for (tx, _) in &blobs {
657            assert!(store.is_cached(tx));
658        }
659
660        store.delete_all(txs.clone()).unwrap();
661        store.cleanup();
662
663        for tx in txs {
664            let result = store.get(tx).unwrap();
665            assert_eq!(
666                result,
667                Some(Arc::new(BlobTransactionSidecar {
668                    blobs: vec![],
669                    commitments: vec![],
670                    proofs: vec![]
671                }))
672            );
673        }
674    }
675
676    #[test]
677    fn disk_get_all_blobs() {
678        let (store, _dir) = tmp_store();
679
680        let blobs = rng_blobs(3);
681        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
682        store.insert_all(blobs.clone()).unwrap();
683
684        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
685        for (tx, blob) in retrieved_blobs {
686            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
687        }
688
689        store.delete_all(txs).unwrap();
690        store.cleanup();
691    }
692
693    #[test]
694    fn disk_get_exact_blobs_success() {
695        let (store, _dir) = tmp_store();
696
697        let blobs = rng_blobs(3);
698        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
699        store.insert_all(blobs.clone()).unwrap();
700
701        let retrieved_blobs = store.get_exact(txs).unwrap();
702        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
703            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
704        }
705    }
706
707    #[test]
708    fn disk_get_exact_blobs_failure() {
709        let (store, _dir) = tmp_store();
710
711        let blobs = rng_blobs(2);
712        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
713        store.insert_all(blobs).unwrap();
714
715        // Try to get a blob that was never inserted
716        let missing_tx = TxHash::random();
717        let result = store.get_exact(vec![txs[0], missing_tx]);
718        assert!(result.is_err());
719    }
720
721    #[test]
722    fn disk_data_size_hint() {
723        let (store, _dir) = tmp_store();
724        assert_eq!(store.data_size_hint(), Some(0));
725
726        let blobs = rng_blobs(2);
727        store.insert_all(blobs).unwrap();
728        assert!(store.data_size_hint().unwrap() > 0);
729    }
730
731    #[test]
732    fn disk_cleanup_stat() {
733        let (store, _dir) = tmp_store();
734
735        let blobs = rng_blobs(3);
736        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
737        store.insert_all(blobs).unwrap();
738
739        store.delete_all(txs).unwrap();
740        let stat = store.cleanup();
741        assert_eq!(stat.delete_succeed, 3);
742        assert_eq!(stat.delete_failed, 0);
743    }
744}