reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2},
6    eip7594::BlobTransactionSidecarVariant,
7};
8use alloy_primitives::{TxHash, B256};
9use parking_lot::{Mutex, RwLock};
10use schnellru::{ByLength, LruMap};
11use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
12use tracing::{debug, trace};
13
14/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
15pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
16
17/// A blob store that stores blob data on disk.
18///
19/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
20/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
21/// blobs from disk.
22#[derive(Clone, Debug)]
23pub struct DiskFileBlobStore {
24    inner: Arc<DiskFileBlobStoreInner>,
25}
26
27impl DiskFileBlobStore {
28    /// Opens and initializes a new disk file blob store according to the given options.
29    pub fn open(
30        blob_dir: impl Into<PathBuf>,
31        opts: DiskFileBlobStoreConfig,
32    ) -> Result<Self, DiskFileBlobStoreError> {
33        let blob_dir = blob_dir.into();
34        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
35        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
36
37        // initialize the blob store
38        inner.delete_all()?;
39        inner.create_blob_dir()?;
40
41        Ok(Self { inner: Arc::new(inner) })
42    }
43
44    #[cfg(test)]
45    fn is_cached(&self, tx: &B256) -> bool {
46        self.inner.blob_cache.lock().get(tx).is_some()
47    }
48
49    #[cfg(test)]
50    fn clear_cache(&self) {
51        self.inner.blob_cache.lock().clear()
52    }
53}
54
55impl BlobStore for DiskFileBlobStore {
56    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
57        self.inner.insert_one(tx, data)
58    }
59
60    fn insert_all(
61        &self,
62        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
63    ) -> Result<(), BlobStoreError> {
64        if txs.is_empty() {
65            return Ok(())
66        }
67        self.inner.insert_many(txs)
68    }
69
70    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
71        if self.inner.contains(tx)? {
72            self.inner.txs_to_delete.write().insert(tx);
73        }
74        Ok(())
75    }
76
77    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
78        let txs = self.inner.retain_existing(txs)?;
79        self.inner.txs_to_delete.write().extend(txs);
80        Ok(())
81    }
82
83    fn cleanup(&self) -> BlobStoreCleanupStat {
84        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
85        let mut stat = BlobStoreCleanupStat::default();
86        let mut subsize = 0;
87        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
88        for tx in txs_to_delete {
89            let path = self.inner.blob_disk_file(tx);
90            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
91            match fs::remove_file(&path) {
92                Ok(_) => {
93                    stat.delete_succeed += 1;
94                    subsize += filesize;
95                }
96                Err(e) => {
97                    stat.delete_failed += 1;
98                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
99                    debug!(target:"txpool::blob", %err);
100                }
101            };
102        }
103        self.inner.size_tracker.sub_size(subsize as usize);
104        self.inner.size_tracker.sub_len(stat.delete_succeed);
105        stat
106    }
107
108    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
109        self.inner.get_one(tx)
110    }
111
112    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
113        self.inner.contains(tx)
114    }
115
116    fn get_all(
117        &self,
118        txs: Vec<B256>,
119    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
120        if txs.is_empty() {
121            return Ok(Vec::new())
122        }
123        self.inner.get_all(txs)
124    }
125
126    fn get_exact(
127        &self,
128        txs: Vec<B256>,
129    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
130        if txs.is_empty() {
131            return Ok(Vec::new())
132        }
133        self.inner.get_exact(txs)
134    }
135
136    fn get_by_versioned_hashes_v1(
137        &self,
138        versioned_hashes: &[B256],
139    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
140        // the response must always be the same len as the request, misses must be None
141        let mut result = vec![None; versioned_hashes.len()];
142
143        // first scan all cached full sidecars
144        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
145            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
146                for (hash_idx, match_result) in
147                    blob_sidecar.match_versioned_hashes(versioned_hashes)
148                {
149                    result[hash_idx] = Some(match_result);
150                }
151            }
152
153            // return early if all blobs are found.
154            if result.iter().all(|blob| blob.is_some()) {
155                return Ok(result);
156            }
157        }
158
159        // not all versioned hashes were be found, try to look up a matching tx
160
161        let mut missing_tx_hashes = Vec::new();
162
163        {
164            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
165            for (idx, _) in
166                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
167            {
168                // this is safe because the result vec has the same len
169                let versioned_hash = versioned_hashes[idx];
170                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
171                    missing_tx_hashes.push(tx_hash);
172                }
173            }
174        }
175
176        // if we have missing blobs, try to read them from disk and try again
177        if !missing_tx_hashes.is_empty() {
178            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
179            for (_, blob_sidecar) in blobs_from_disk {
180                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
181                    for (hash_idx, match_result) in
182                        blob_sidecar.match_versioned_hashes(versioned_hashes)
183                    {
184                        if result[hash_idx].is_none() {
185                            result[hash_idx] = Some(match_result);
186                        }
187                    }
188                }
189            }
190        }
191
192        Ok(result)
193    }
194
195    fn get_by_versioned_hashes_v2(
196        &self,
197        versioned_hashes: &[B256],
198    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
199        // we must return the blobs in order but we don't necessarily find them in the requested
200        // order
201        let mut result = vec![None; versioned_hashes.len()];
202
203        // first scan all cached full sidecars
204        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
205            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
206                for (hash_idx, match_result) in
207                    blob_sidecar.match_versioned_hashes(versioned_hashes)
208                {
209                    result[hash_idx] = Some(match_result);
210                }
211            }
212
213            // return early if all blobs are found.
214            if result.iter().all(|blob| blob.is_some()) {
215                // got all blobs, can return early
216                return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
217            }
218        }
219
220        // not all versioned hashes were found, try to look up a matching tx
221        let mut missing_tx_hashes = Vec::new();
222
223        {
224            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
225            for (idx, _) in
226                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
227            {
228                // this is safe because the result vec has the same len
229                let versioned_hash = versioned_hashes[idx];
230                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
231                    missing_tx_hashes.push(tx_hash);
232                }
233            }
234        }
235
236        // if we have missing blobs, try to read them from disk and try again
237        if !missing_tx_hashes.is_empty() {
238            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
239            for (_, blob_sidecar) in blobs_from_disk {
240                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
241                    for (hash_idx, match_result) in
242                        blob_sidecar.match_versioned_hashes(versioned_hashes)
243                    {
244                        if result[hash_idx].is_none() {
245                            result[hash_idx] = Some(match_result);
246                        }
247                    }
248                }
249            }
250        }
251
252        // only return the blobs if we found all requested versioned hashes
253        if result.iter().all(|blob| blob.is_some()) {
254            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
255        } else {
256            Ok(None)
257        }
258    }
259
260    fn data_size_hint(&self) -> Option<usize> {
261        Some(self.inner.size_tracker.data_size())
262    }
263
264    fn blobs_len(&self) -> usize {
265        self.inner.size_tracker.blobs_len()
266    }
267}
268
269struct DiskFileBlobStoreInner {
270    blob_dir: PathBuf,
271    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
272    size_tracker: BlobStoreSize,
273    file_lock: RwLock<()>,
274    txs_to_delete: RwLock<HashSet<B256>>,
275    /// Tracks of known versioned hashes and a transaction they exist in
276    ///
277    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
278    /// the most recent one.
279    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
280}
281
282impl DiskFileBlobStoreInner {
283    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
284    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
285        Self {
286            blob_dir,
287            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
288            size_tracker: Default::default(),
289            file_lock: Default::default(),
290            txs_to_delete: Default::default(),
291            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(max_length * 6))),
292        }
293    }
294
295    /// Creates the directory where blobs will be stored on disk.
296    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
297        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
298        fs::create_dir_all(&self.blob_dir)
299            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
300    }
301
302    /// Deletes the entire blob store.
303    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
304        match fs::remove_dir_all(&self.blob_dir) {
305            Ok(_) => {
306                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
307            }
308            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
309            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
310        }
311        Ok(())
312    }
313
314    /// Ensures blob is in the blob cache and written to the disk.
315    fn insert_one(
316        &self,
317        tx: B256,
318        data: BlobTransactionSidecarVariant,
319    ) -> Result<(), BlobStoreError> {
320        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
321        data.rlp_encode_fields(&mut buf);
322
323        {
324            // cache the versioned hashes to tx hash
325            let mut map = self.versioned_hashes_to_txhash.lock();
326            data.versioned_hashes().for_each(|hash| {
327                map.insert(hash, tx);
328            });
329        }
330
331        self.blob_cache.lock().insert(tx, Arc::new(data));
332
333        let size = self.write_one_encoded(tx, &buf)?;
334
335        self.size_tracker.add_size(size);
336        self.size_tracker.inc_len(1);
337        Ok(())
338    }
339
340    /// Ensures blobs are in the blob cache and written to the disk.
341    fn insert_many(
342        &self,
343        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
344    ) -> Result<(), BlobStoreError> {
345        let raw = txs
346            .iter()
347            .map(|(tx, data)| {
348                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
349                data.rlp_encode_fields(&mut buf);
350                (self.blob_disk_file(*tx), buf)
351            })
352            .collect::<Vec<_>>();
353
354        {
355            // cache versioned hashes to tx hash
356            let mut map = self.versioned_hashes_to_txhash.lock();
357            for (tx, data) in &txs {
358                data.versioned_hashes().for_each(|hash| {
359                    map.insert(hash, *tx);
360                });
361            }
362        }
363
364        {
365            // cache blobs
366            let mut cache = self.blob_cache.lock();
367            for (tx, data) in txs {
368                cache.insert(tx, Arc::new(data));
369            }
370        }
371
372        let mut add = 0;
373        let mut num = 0;
374        {
375            let _lock = self.file_lock.write();
376            for (path, data) in raw {
377                if path.exists() {
378                    debug!(target:"txpool::blob", ?path, "Blob already exists");
379                } else if let Err(err) = fs::write(&path, &data) {
380                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
381                } else {
382                    add += data.len();
383                    num += 1;
384                }
385            }
386        }
387        self.size_tracker.add_size(add);
388        self.size_tracker.inc_len(num);
389
390        Ok(())
391    }
392
393    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
394    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
395        if self.blob_cache.lock().get(&tx).is_some() {
396            return Ok(true)
397        }
398        // we only check if the file exists and assume it's valid
399        Ok(self.blob_disk_file(tx).is_file())
400    }
401
402    /// Returns all the blob transactions which are in the cache or on the disk.
403    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
404        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
405            let mut cache = self.blob_cache.lock();
406            txs.into_iter().partition(|tx| cache.get(tx).is_some())
407        };
408
409        let mut existing = in_cache;
410        for tx in not_in_cache {
411            if self.blob_disk_file(tx).is_file() {
412                existing.push(tx);
413            }
414        }
415
416        Ok(existing)
417    }
418
419    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
420    fn get_one(
421        &self,
422        tx: B256,
423    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
424        if let Some(blob) = self.blob_cache.lock().get(&tx) {
425            return Ok(Some(blob.clone()))
426        }
427
428        if let Some(blob) = self.read_one(tx)? {
429            let blob_arc = Arc::new(blob);
430            self.blob_cache.lock().insert(tx, blob_arc.clone());
431            return Ok(Some(blob_arc))
432        }
433
434        Ok(None)
435    }
436
437    /// Returns the path to the blob file for the given transaction hash.
438    #[inline]
439    fn blob_disk_file(&self, tx: B256) -> PathBuf {
440        self.blob_dir.join(format!("{tx:x}"))
441    }
442
443    /// Retrieves the blob data for the given transaction hash.
444    #[inline]
445    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
446        let path = self.blob_disk_file(tx);
447        let data = {
448            let _lock = self.file_lock.read();
449            match fs::read(&path) {
450                Ok(data) => data,
451                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
452                Err(e) => {
453                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
454                        tx, path, e,
455                    ))))
456                }
457            }
458        };
459        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
460            .map(Some)
461            .map_err(BlobStoreError::DecodeError)
462    }
463
464    /// Returns decoded blobs read from disk.
465    ///
466    /// Only returns sidecars that were found and successfully decoded.
467    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
468        self.read_many_raw(txs)
469            .into_iter()
470            .filter_map(|(tx, data)| {
471                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
472                    .map(|sidecar| (tx, sidecar))
473                    .ok()
474            })
475            .collect()
476    }
477
478    /// Retrieves the raw blob data for the given transaction hashes.
479    ///
480    /// Only returns the blobs that were found on file.
481    #[inline]
482    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
483        let mut res = Vec::with_capacity(txs.len());
484        let _lock = self.file_lock.read();
485        for tx in txs {
486            let path = self.blob_disk_file(tx);
487            match fs::read(&path) {
488                Ok(data) => {
489                    res.push((tx, data));
490                }
491                Err(err) => {
492                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
493                }
494            };
495        }
496        res
497    }
498
499    /// Writes the blob data for the given transaction hash to the disk.
500    #[inline]
501    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
502        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
503        let mut add = 0;
504        let path = self.blob_disk_file(tx);
505        {
506            let _lock = self.file_lock.write();
507            if !path.exists() {
508                fs::write(&path, data)
509                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
510                add = data.len();
511            }
512        }
513        Ok(add)
514    }
515
516    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
517    ///
518    /// This will not return an error if there are missing blobs. Therefore, the result may be a
519    /// subset of the request or an empty vector if none of the blobs were found.
520    #[inline]
521    fn get_all(
522        &self,
523        txs: Vec<B256>,
524    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
525        let mut res = Vec::with_capacity(txs.len());
526        let mut cache_miss = Vec::new();
527        {
528            let mut cache = self.blob_cache.lock();
529            for tx in txs {
530                if let Some(blob) = cache.get(&tx) {
531                    res.push((tx, blob.clone()));
532                } else {
533                    cache_miss.push(tx)
534                }
535            }
536        }
537        if cache_miss.is_empty() {
538            return Ok(res)
539        }
540        let from_disk = self.read_many_decoded(cache_miss);
541        if from_disk.is_empty() {
542            return Ok(res)
543        }
544        let from_disk = from_disk
545            .into_iter()
546            .map(|(tx, data)| {
547                let data = Arc::new(data);
548                res.push((tx, data.clone()));
549                (tx, data)
550            })
551            .collect::<Vec<_>>();
552
553        let mut cache = self.blob_cache.lock();
554        for (tx, data) in from_disk {
555            cache.insert(tx, data);
556        }
557
558        Ok(res)
559    }
560
561    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
562    ///
563    /// Returns an error if there are any missing blobs.
564    #[inline]
565    fn get_exact(
566        &self,
567        txs: Vec<B256>,
568    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
569        txs.into_iter()
570            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
571            .collect()
572    }
573}
574
575impl fmt::Debug for DiskFileBlobStoreInner {
576    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577        f.debug_struct("DiskFileBlobStoreInner")
578            .field("blob_dir", &self.blob_dir)
579            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
580            .field("txs_to_delete", &self.txs_to_delete.try_read())
581            .finish()
582    }
583}
584
585/// Errors that can occur when interacting with a disk file blob store.
586#[derive(Debug, thiserror::Error)]
587pub enum DiskFileBlobStoreError {
588    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
589    #[error("failed to open blobstore at {0}: {1}")]
590    /// Indicates a failure to open the blob store directory.
591    Open(PathBuf, io::Error),
592    /// Failure while reading a blob file.
593    #[error("[{0}] failed to read blob file at {1}: {2}")]
594    /// Indicates a failure while reading a blob file.
595    ReadFile(TxHash, PathBuf, io::Error),
596    /// Failure while writing a blob file.
597    #[error("[{0}] failed to write blob file at {1}: {2}")]
598    /// Indicates a failure while writing a blob file.
599    WriteFile(TxHash, PathBuf, io::Error),
600    /// Failure while deleting a blob file.
601    #[error("[{0}] failed to delete blob file at {1}: {2}")]
602    /// Indicates a failure while deleting a blob file.
603    DeleteFile(TxHash, PathBuf, io::Error),
604}
605
606impl From<DiskFileBlobStoreError> for BlobStoreError {
607    fn from(value: DiskFileBlobStoreError) -> Self {
608        Self::Other(Box::new(value))
609    }
610}
611
612/// Configuration for a disk file blob store.
613#[derive(Debug, Clone)]
614pub struct DiskFileBlobStoreConfig {
615    /// The maximum number of blobs to keep in the in memory blob cache.
616    pub max_cached_entries: u32,
617    /// How to open the blob store.
618    pub open: OpenDiskFileBlobStore,
619}
620
621impl Default for DiskFileBlobStoreConfig {
622    fn default() -> Self {
623        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
624    }
625}
626
627impl DiskFileBlobStoreConfig {
628    /// Set maximum number of blobs to keep in the in memory blob cache.
629    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
630        self.max_cached_entries = max_cached_entries;
631        self
632    }
633}
634
635/// How to open a disk file blob store.
636#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
637pub enum OpenDiskFileBlobStore {
638    /// Clear everything in the blob store.
639    #[default]
640    Clear,
641    /// Keep the existing blob store and index
642    ReIndex,
643}
644
645#[cfg(test)]
646mod tests {
647    use alloy_consensus::BlobTransactionSidecar;
648    use alloy_eips::eip7594::BlobTransactionSidecarVariant;
649
650    use super::*;
651    use std::sync::atomic::Ordering;
652
653    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
654        let dir = tempfile::tempdir().unwrap();
655        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
656        (store, dir)
657    }
658
659    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
660        let mut rng = rand::rng();
661        (0..num)
662            .map(|_| {
663                let tx = TxHash::random_with(&mut rng);
664                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
665                    blobs: vec![],
666                    commitments: vec![],
667                    proofs: vec![],
668                });
669                (tx, blob)
670            })
671            .collect()
672    }
673
674    #[test]
675    fn disk_insert_all_get_all() {
676        let (store, _dir) = tmp_store();
677
678        let blobs = rng_blobs(10);
679        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
680        store.insert_all(blobs.clone()).unwrap();
681
682        // all cached
683        for (tx, blob) in &blobs {
684            assert!(store.is_cached(tx));
685            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
686            assert_eq!(b, *blob);
687        }
688
689        let all = store.get_all(all_hashes.clone()).unwrap();
690        for (tx, blob) in all {
691            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
692        }
693
694        assert!(store.contains(all_hashes[0]).unwrap());
695        store.delete_all(all_hashes.clone()).unwrap();
696        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
697        store.clear_cache();
698        store.cleanup();
699
700        assert!(store.get(blobs[0].0).unwrap().is_none());
701
702        let all = store.get_all(all_hashes.clone()).unwrap();
703        assert!(all.is_empty());
704
705        assert!(!store.contains(all_hashes[0]).unwrap());
706        assert!(store.get_exact(all_hashes).is_err());
707
708        assert_eq!(store.data_size_hint(), Some(0));
709        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
710    }
711
712    #[test]
713    fn disk_insert_and_retrieve() {
714        let (store, _dir) = tmp_store();
715
716        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
717        store.insert(tx, blob.clone()).unwrap();
718
719        assert!(store.is_cached(&tx));
720        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
721        assert_eq!(retrieved_blob, blob);
722    }
723
724    #[test]
725    fn disk_delete_blob() {
726        let (store, _dir) = tmp_store();
727
728        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
729        store.insert(tx, blob).unwrap();
730        assert!(store.is_cached(&tx));
731
732        store.delete(tx).unwrap();
733        assert!(store.inner.txs_to_delete.read().contains(&tx));
734        store.cleanup();
735
736        let result = store.get(tx).unwrap();
737        assert_eq!(
738            result,
739            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
740                blobs: vec![],
741                commitments: vec![],
742                proofs: vec![]
743            })))
744        );
745    }
746
747    #[test]
748    fn disk_insert_all_and_delete_all() {
749        let (store, _dir) = tmp_store();
750
751        let blobs = rng_blobs(5);
752        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
753        store.insert_all(blobs.clone()).unwrap();
754
755        for (tx, _) in &blobs {
756            assert!(store.is_cached(tx));
757        }
758
759        store.delete_all(txs.clone()).unwrap();
760        store.cleanup();
761
762        for tx in txs {
763            let result = store.get(tx).unwrap();
764            assert_eq!(
765                result,
766                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
767                    blobs: vec![],
768                    commitments: vec![],
769                    proofs: vec![]
770                })))
771            );
772        }
773    }
774
775    #[test]
776    fn disk_get_all_blobs() {
777        let (store, _dir) = tmp_store();
778
779        let blobs = rng_blobs(3);
780        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
781        store.insert_all(blobs.clone()).unwrap();
782
783        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
784        for (tx, blob) in retrieved_blobs {
785            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
786        }
787
788        store.delete_all(txs).unwrap();
789        store.cleanup();
790    }
791
792    #[test]
793    fn disk_get_exact_blobs_success() {
794        let (store, _dir) = tmp_store();
795
796        let blobs = rng_blobs(3);
797        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
798        store.insert_all(blobs.clone()).unwrap();
799
800        let retrieved_blobs = store.get_exact(txs).unwrap();
801        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
802            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
803        }
804    }
805
806    #[test]
807    fn disk_get_exact_blobs_failure() {
808        let (store, _dir) = tmp_store();
809
810        let blobs = rng_blobs(2);
811        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
812        store.insert_all(blobs).unwrap();
813
814        // Try to get a blob that was never inserted
815        let missing_tx = TxHash::random();
816        let result = store.get_exact(vec![txs[0], missing_tx]);
817        assert!(result.is_err());
818    }
819
820    #[test]
821    fn disk_data_size_hint() {
822        let (store, _dir) = tmp_store();
823        assert_eq!(store.data_size_hint(), Some(0));
824
825        let blobs = rng_blobs(2);
826        store.insert_all(blobs).unwrap();
827        assert!(store.data_size_hint().unwrap() > 0);
828    }
829
830    #[test]
831    fn disk_cleanup_stat() {
832        let (store, _dir) = tmp_store();
833
834        let blobs = rng_blobs(3);
835        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
836        store.insert_all(blobs).unwrap();
837
838        store.delete_all(txs).unwrap();
839        let stat = store.cleanup();
840        assert_eq!(stat.delete_succeed, 3);
841        assert_eq!(stat.delete_failed, 0);
842    }
843}