reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2},
6    eip7594::BlobTransactionSidecarVariant,
7    eip7840::BlobParams,
8    merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
17pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19/// A cache size heuristic based on the highest blob params
20///
21/// This uses the max blobs per tx and max blobs per block over 16 epochs: `21 * 6 * 512 = 64512`
22/// This should be ~4MB
23const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24    BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26/// A blob store that stores blob data on disk.
27///
28/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
29/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
30/// blobs from disk.
31#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33    inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37    /// Opens and initializes a new disk file blob store according to the given options.
38    pub fn open(
39        blob_dir: impl Into<PathBuf>,
40        opts: DiskFileBlobStoreConfig,
41    ) -> Result<Self, DiskFileBlobStoreError> {
42        let blob_dir = blob_dir.into();
43        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46        // initialize the blob store
47        inner.delete_all()?;
48        inner.create_blob_dir()?;
49
50        Ok(Self { inner: Arc::new(inner) })
51    }
52
53    #[cfg(test)]
54    fn is_cached(&self, tx: &B256) -> bool {
55        self.inner.blob_cache.lock().get(tx).is_some()
56    }
57
58    #[cfg(test)]
59    fn clear_cache(&self) {
60        self.inner.blob_cache.lock().clear()
61    }
62}
63
64impl BlobStore for DiskFileBlobStore {
65    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
66        self.inner.insert_one(tx, data)
67    }
68
69    fn insert_all(
70        &self,
71        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
72    ) -> Result<(), BlobStoreError> {
73        if txs.is_empty() {
74            return Ok(())
75        }
76        self.inner.insert_many(txs)
77    }
78
79    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
80        if self.inner.contains(tx)? {
81            self.inner.txs_to_delete.write().insert(tx);
82        }
83        Ok(())
84    }
85
86    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
87        let txs = self.inner.retain_existing(txs)?;
88        self.inner.txs_to_delete.write().extend(txs);
89        Ok(())
90    }
91
92    fn cleanup(&self) -> BlobStoreCleanupStat {
93        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
94        let mut stat = BlobStoreCleanupStat::default();
95        let mut subsize = 0;
96        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
97        for tx in txs_to_delete {
98            let path = self.inner.blob_disk_file(tx);
99            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
100            match fs::remove_file(&path) {
101                Ok(_) => {
102                    stat.delete_succeed += 1;
103                    subsize += filesize;
104                }
105                Err(e) => {
106                    stat.delete_failed += 1;
107                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
108                    debug!(target:"txpool::blob", %err);
109                }
110            };
111        }
112        self.inner.size_tracker.sub_size(subsize as usize);
113        self.inner.size_tracker.sub_len(stat.delete_succeed);
114        stat
115    }
116
117    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
118        self.inner.get_one(tx)
119    }
120
121    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
122        self.inner.contains(tx)
123    }
124
125    fn get_all(
126        &self,
127        txs: Vec<B256>,
128    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
129        if txs.is_empty() {
130            return Ok(Vec::new())
131        }
132        self.inner.get_all(txs)
133    }
134
135    fn get_exact(
136        &self,
137        txs: Vec<B256>,
138    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
139        if txs.is_empty() {
140            return Ok(Vec::new())
141        }
142        self.inner.get_exact(txs)
143    }
144
145    fn get_by_versioned_hashes_v1(
146        &self,
147        versioned_hashes: &[B256],
148    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
149        // the response must always be the same len as the request, misses must be None
150        let mut result = vec![None; versioned_hashes.len()];
151
152        // first scan all cached full sidecars
153        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
154            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
155                for (hash_idx, match_result) in
156                    blob_sidecar.match_versioned_hashes(versioned_hashes)
157                {
158                    result[hash_idx] = Some(match_result);
159                }
160            }
161
162            // return early if all blobs are found.
163            if result.iter().all(|blob| blob.is_some()) {
164                return Ok(result);
165            }
166        }
167
168        // not all versioned hashes were be found, try to look up a matching tx
169
170        let mut missing_tx_hashes = Vec::new();
171
172        {
173            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
174            for (idx, _) in
175                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
176            {
177                // this is safe because the result vec has the same len
178                let versioned_hash = versioned_hashes[idx];
179                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
180                    missing_tx_hashes.push(tx_hash);
181                }
182            }
183        }
184
185        // if we have missing blobs, try to read them from disk and try again
186        if !missing_tx_hashes.is_empty() {
187            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
188            for (_, blob_sidecar) in blobs_from_disk {
189                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
190                    for (hash_idx, match_result) in
191                        blob_sidecar.match_versioned_hashes(versioned_hashes)
192                    {
193                        if result[hash_idx].is_none() {
194                            result[hash_idx] = Some(match_result);
195                        }
196                    }
197                }
198            }
199        }
200
201        Ok(result)
202    }
203
204    fn get_by_versioned_hashes_v2(
205        &self,
206        versioned_hashes: &[B256],
207    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
208        // we must return the blobs in order but we don't necessarily find them in the requested
209        // order
210        let mut result = vec![None; versioned_hashes.len()];
211
212        // first scan all cached full sidecars
213        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
214            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
215                for (hash_idx, match_result) in
216                    blob_sidecar.match_versioned_hashes(versioned_hashes)
217                {
218                    result[hash_idx] = Some(match_result);
219                }
220            }
221
222            // return early if all blobs are found.
223            if result.iter().all(|blob| blob.is_some()) {
224                // got all blobs, can return early
225                return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
226            }
227        }
228
229        // not all versioned hashes were found, try to look up a matching tx
230        let mut missing_tx_hashes = Vec::new();
231
232        {
233            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
234            for (idx, _) in
235                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
236            {
237                // this is safe because the result vec has the same len
238                let versioned_hash = versioned_hashes[idx];
239                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
240                    missing_tx_hashes.push(tx_hash);
241                }
242            }
243        }
244
245        // if we have missing blobs, try to read them from disk and try again
246        if !missing_tx_hashes.is_empty() {
247            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
248            for (_, blob_sidecar) in blobs_from_disk {
249                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
250                    for (hash_idx, match_result) in
251                        blob_sidecar.match_versioned_hashes(versioned_hashes)
252                    {
253                        if result[hash_idx].is_none() {
254                            result[hash_idx] = Some(match_result);
255                        }
256                    }
257                }
258            }
259        }
260
261        // only return the blobs if we found all requested versioned hashes
262        if result.iter().all(|blob| blob.is_some()) {
263            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
264        } else {
265            Ok(None)
266        }
267    }
268
269    fn data_size_hint(&self) -> Option<usize> {
270        Some(self.inner.size_tracker.data_size())
271    }
272
273    fn blobs_len(&self) -> usize {
274        self.inner.size_tracker.blobs_len()
275    }
276}
277
278struct DiskFileBlobStoreInner {
279    blob_dir: PathBuf,
280    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
281    size_tracker: BlobStoreSize,
282    file_lock: RwLock<()>,
283    txs_to_delete: RwLock<HashSet<B256>>,
284    /// Tracks of known versioned hashes and a transaction they exist in
285    ///
286    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
287    /// the most recent one.
288    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
289}
290
291impl DiskFileBlobStoreInner {
292    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
293    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
294        Self {
295            blob_dir,
296            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
297            size_tracker: Default::default(),
298            file_lock: Default::default(),
299            txs_to_delete: Default::default(),
300            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
301                VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
302            ))),
303        }
304    }
305
306    /// Creates the directory where blobs will be stored on disk.
307    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
308        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
309        fs::create_dir_all(&self.blob_dir)
310            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
311    }
312
313    /// Deletes the entire blob store.
314    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
315        match fs::remove_dir_all(&self.blob_dir) {
316            Ok(_) => {
317                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
318            }
319            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
320            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
321        }
322        Ok(())
323    }
324
325    /// Ensures blob is in the blob cache and written to the disk.
326    fn insert_one(
327        &self,
328        tx: B256,
329        data: BlobTransactionSidecarVariant,
330    ) -> Result<(), BlobStoreError> {
331        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
332        data.rlp_encode_fields(&mut buf);
333
334        {
335            // cache the versioned hashes to tx hash
336            let mut map = self.versioned_hashes_to_txhash.lock();
337            data.versioned_hashes().for_each(|hash| {
338                map.insert(hash, tx);
339            });
340        }
341
342        self.blob_cache.lock().insert(tx, Arc::new(data));
343
344        let size = self.write_one_encoded(tx, &buf)?;
345
346        self.size_tracker.add_size(size);
347        self.size_tracker.inc_len(1);
348        Ok(())
349    }
350
351    /// Ensures blobs are in the blob cache and written to the disk.
352    fn insert_many(
353        &self,
354        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
355    ) -> Result<(), BlobStoreError> {
356        let raw = txs
357            .iter()
358            .map(|(tx, data)| {
359                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
360                data.rlp_encode_fields(&mut buf);
361                (self.blob_disk_file(*tx), buf)
362            })
363            .collect::<Vec<_>>();
364
365        {
366            // cache versioned hashes to tx hash
367            let mut map = self.versioned_hashes_to_txhash.lock();
368            for (tx, data) in &txs {
369                data.versioned_hashes().for_each(|hash| {
370                    map.insert(hash, *tx);
371                });
372            }
373        }
374
375        {
376            // cache blobs
377            let mut cache = self.blob_cache.lock();
378            for (tx, data) in txs {
379                cache.insert(tx, Arc::new(data));
380            }
381        }
382
383        let mut add = 0;
384        let mut num = 0;
385        {
386            let _lock = self.file_lock.write();
387            for (path, data) in raw {
388                if path.exists() {
389                    debug!(target:"txpool::blob", ?path, "Blob already exists");
390                } else if let Err(err) = fs::write(&path, &data) {
391                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
392                } else {
393                    add += data.len();
394                    num += 1;
395                }
396            }
397        }
398        self.size_tracker.add_size(add);
399        self.size_tracker.inc_len(num);
400
401        Ok(())
402    }
403
404    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
405    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
406        if self.blob_cache.lock().get(&tx).is_some() {
407            return Ok(true)
408        }
409        // we only check if the file exists and assume it's valid
410        Ok(self.blob_disk_file(tx).is_file())
411    }
412
413    /// Returns all the blob transactions which are in the cache or on the disk.
414    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
415        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
416            let mut cache = self.blob_cache.lock();
417            txs.into_iter().partition(|tx| cache.get(tx).is_some())
418        };
419
420        let mut existing = in_cache;
421        for tx in not_in_cache {
422            if self.blob_disk_file(tx).is_file() {
423                existing.push(tx);
424            }
425        }
426
427        Ok(existing)
428    }
429
430    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
431    fn get_one(
432        &self,
433        tx: B256,
434    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
435        if let Some(blob) = self.blob_cache.lock().get(&tx) {
436            return Ok(Some(blob.clone()))
437        }
438
439        if let Some(blob) = self.read_one(tx)? {
440            let blob_arc = Arc::new(blob);
441            self.blob_cache.lock().insert(tx, blob_arc.clone());
442            return Ok(Some(blob_arc))
443        }
444
445        Ok(None)
446    }
447
448    /// Returns the path to the blob file for the given transaction hash.
449    #[inline]
450    fn blob_disk_file(&self, tx: B256) -> PathBuf {
451        self.blob_dir.join(format!("{tx:x}"))
452    }
453
454    /// Retrieves the blob data for the given transaction hash.
455    #[inline]
456    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
457        let path = self.blob_disk_file(tx);
458        let data = {
459            let _lock = self.file_lock.read();
460            match fs::read(&path) {
461                Ok(data) => data,
462                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
463                Err(e) => {
464                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
465                        tx, path, e,
466                    ))))
467                }
468            }
469        };
470        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
471            .map(Some)
472            .map_err(BlobStoreError::DecodeError)
473    }
474
475    /// Returns decoded blobs read from disk.
476    ///
477    /// Only returns sidecars that were found and successfully decoded.
478    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
479        self.read_many_raw(txs)
480            .into_iter()
481            .filter_map(|(tx, data)| {
482                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
483                    .map(|sidecar| (tx, sidecar))
484                    .ok()
485            })
486            .collect()
487    }
488
489    /// Retrieves the raw blob data for the given transaction hashes.
490    ///
491    /// Only returns the blobs that were found in file.
492    #[inline]
493    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
494        let mut res = Vec::with_capacity(txs.len());
495        let _lock = self.file_lock.read();
496        for tx in txs {
497            let path = self.blob_disk_file(tx);
498            match fs::read(&path) {
499                Ok(data) => {
500                    res.push((tx, data));
501                }
502                Err(err) => {
503                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
504                }
505            };
506        }
507        res
508    }
509
510    /// Writes the blob data for the given transaction hash to the disk.
511    #[inline]
512    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
513        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
514        let mut add = 0;
515        let path = self.blob_disk_file(tx);
516        {
517            let _lock = self.file_lock.write();
518            if !path.exists() {
519                fs::write(&path, data)
520                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
521                add = data.len();
522            }
523        }
524        Ok(add)
525    }
526
527    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
528    ///
529    /// This will not return an error if there are missing blobs. Therefore, the result may be a
530    /// subset of the request or an empty vector if none of the blobs were found.
531    #[inline]
532    fn get_all(
533        &self,
534        txs: Vec<B256>,
535    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
536        let mut res = Vec::with_capacity(txs.len());
537        let mut cache_miss = Vec::new();
538        {
539            let mut cache = self.blob_cache.lock();
540            for tx in txs {
541                if let Some(blob) = cache.get(&tx) {
542                    res.push((tx, blob.clone()));
543                } else {
544                    cache_miss.push(tx)
545                }
546            }
547        }
548        if cache_miss.is_empty() {
549            return Ok(res)
550        }
551        let from_disk = self.read_many_decoded(cache_miss);
552        if from_disk.is_empty() {
553            return Ok(res)
554        }
555        let from_disk = from_disk
556            .into_iter()
557            .map(|(tx, data)| {
558                let data = Arc::new(data);
559                res.push((tx, data.clone()));
560                (tx, data)
561            })
562            .collect::<Vec<_>>();
563
564        let mut cache = self.blob_cache.lock();
565        for (tx, data) in from_disk {
566            cache.insert(tx, data);
567        }
568
569        Ok(res)
570    }
571
572    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
573    ///
574    /// Returns an error if there are any missing blobs.
575    #[inline]
576    fn get_exact(
577        &self,
578        txs: Vec<B256>,
579    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
580        txs.into_iter()
581            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
582            .collect()
583    }
584}
585
586impl fmt::Debug for DiskFileBlobStoreInner {
587    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588        f.debug_struct("DiskFileBlobStoreInner")
589            .field("blob_dir", &self.blob_dir)
590            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
591            .field("txs_to_delete", &self.txs_to_delete.try_read())
592            .finish()
593    }
594}
595
596/// Errors that can occur when interacting with a disk file blob store.
597#[derive(Debug, thiserror::Error)]
598pub enum DiskFileBlobStoreError {
599    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
600    #[error("failed to open blobstore at {0}: {1}")]
601    /// Indicates a failure to open the blob store directory.
602    Open(PathBuf, io::Error),
603    /// Failure while reading a blob file.
604    #[error("[{0}] failed to read blob file at {1}: {2}")]
605    /// Indicates a failure while reading a blob file.
606    ReadFile(TxHash, PathBuf, io::Error),
607    /// Failure while writing a blob file.
608    #[error("[{0}] failed to write blob file at {1}: {2}")]
609    /// Indicates a failure while writing a blob file.
610    WriteFile(TxHash, PathBuf, io::Error),
611    /// Failure while deleting a blob file.
612    #[error("[{0}] failed to delete blob file at {1}: {2}")]
613    /// Indicates a failure while deleting a blob file.
614    DeleteFile(TxHash, PathBuf, io::Error),
615}
616
617impl From<DiskFileBlobStoreError> for BlobStoreError {
618    fn from(value: DiskFileBlobStoreError) -> Self {
619        Self::Other(Box::new(value))
620    }
621}
622
623/// Configuration for a disk file blob store.
624#[derive(Debug, Clone)]
625pub struct DiskFileBlobStoreConfig {
626    /// The maximum number of blobs to keep in the in memory blob cache.
627    pub max_cached_entries: u32,
628    /// How to open the blob store.
629    pub open: OpenDiskFileBlobStore,
630}
631
632impl Default for DiskFileBlobStoreConfig {
633    fn default() -> Self {
634        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
635    }
636}
637
638impl DiskFileBlobStoreConfig {
639    /// Set maximum number of blobs to keep in the in memory blob cache.
640    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
641        self.max_cached_entries = max_cached_entries;
642        self
643    }
644}
645
646/// How to open a disk file blob store.
647#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
648pub enum OpenDiskFileBlobStore {
649    /// Clear everything in the blob store.
650    #[default]
651    Clear,
652    /// Keep the existing blob store and index
653    ReIndex,
654}
655
656#[cfg(test)]
657mod tests {
658    use alloy_consensus::BlobTransactionSidecar;
659    use alloy_eips::eip7594::BlobTransactionSidecarVariant;
660
661    use super::*;
662    use std::sync::atomic::Ordering;
663
664    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
665        let dir = tempfile::tempdir().unwrap();
666        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
667        (store, dir)
668    }
669
670    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
671        let mut rng = rand::rng();
672        (0..num)
673            .map(|_| {
674                let tx = TxHash::random_with(&mut rng);
675                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
676                    blobs: vec![],
677                    commitments: vec![],
678                    proofs: vec![],
679                });
680                (tx, blob)
681            })
682            .collect()
683    }
684
685    #[test]
686    fn disk_insert_all_get_all() {
687        let (store, _dir) = tmp_store();
688
689        let blobs = rng_blobs(10);
690        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
691        store.insert_all(blobs.clone()).unwrap();
692
693        // all cached
694        for (tx, blob) in &blobs {
695            assert!(store.is_cached(tx));
696            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
697            assert_eq!(b, *blob);
698        }
699
700        let all = store.get_all(all_hashes.clone()).unwrap();
701        for (tx, blob) in all {
702            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
703        }
704
705        assert!(store.contains(all_hashes[0]).unwrap());
706        store.delete_all(all_hashes.clone()).unwrap();
707        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
708        store.clear_cache();
709        store.cleanup();
710
711        assert!(store.get(blobs[0].0).unwrap().is_none());
712
713        let all = store.get_all(all_hashes.clone()).unwrap();
714        assert!(all.is_empty());
715
716        assert!(!store.contains(all_hashes[0]).unwrap());
717        assert!(store.get_exact(all_hashes).is_err());
718
719        assert_eq!(store.data_size_hint(), Some(0));
720        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
721    }
722
723    #[test]
724    fn disk_insert_and_retrieve() {
725        let (store, _dir) = tmp_store();
726
727        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
728        store.insert(tx, blob.clone()).unwrap();
729
730        assert!(store.is_cached(&tx));
731        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
732        assert_eq!(retrieved_blob, blob);
733    }
734
735    #[test]
736    fn disk_delete_blob() {
737        let (store, _dir) = tmp_store();
738
739        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
740        store.insert(tx, blob).unwrap();
741        assert!(store.is_cached(&tx));
742
743        store.delete(tx).unwrap();
744        assert!(store.inner.txs_to_delete.read().contains(&tx));
745        store.cleanup();
746
747        let result = store.get(tx).unwrap();
748        assert_eq!(
749            result,
750            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
751                blobs: vec![],
752                commitments: vec![],
753                proofs: vec![]
754            })))
755        );
756    }
757
758    #[test]
759    fn disk_insert_all_and_delete_all() {
760        let (store, _dir) = tmp_store();
761
762        let blobs = rng_blobs(5);
763        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
764        store.insert_all(blobs.clone()).unwrap();
765
766        for (tx, _) in &blobs {
767            assert!(store.is_cached(tx));
768        }
769
770        store.delete_all(txs.clone()).unwrap();
771        store.cleanup();
772
773        for tx in txs {
774            let result = store.get(tx).unwrap();
775            assert_eq!(
776                result,
777                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
778                    blobs: vec![],
779                    commitments: vec![],
780                    proofs: vec![]
781                })))
782            );
783        }
784    }
785
786    #[test]
787    fn disk_get_all_blobs() {
788        let (store, _dir) = tmp_store();
789
790        let blobs = rng_blobs(3);
791        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
792        store.insert_all(blobs.clone()).unwrap();
793
794        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
795        for (tx, blob) in retrieved_blobs {
796            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
797        }
798
799        store.delete_all(txs).unwrap();
800        store.cleanup();
801    }
802
803    #[test]
804    fn disk_get_exact_blobs_success() {
805        let (store, _dir) = tmp_store();
806
807        let blobs = rng_blobs(3);
808        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
809        store.insert_all(blobs.clone()).unwrap();
810
811        let retrieved_blobs = store.get_exact(txs).unwrap();
812        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
813            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
814        }
815    }
816
817    #[test]
818    fn disk_get_exact_blobs_failure() {
819        let (store, _dir) = tmp_store();
820
821        let blobs = rng_blobs(2);
822        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
823        store.insert_all(blobs).unwrap();
824
825        // Try to get a blob that was never inserted
826        let missing_tx = TxHash::random();
827        let result = store.get_exact(vec![txs[0], missing_tx]);
828        assert!(result.is_err());
829    }
830
831    #[test]
832    fn disk_data_size_hint() {
833        let (store, _dir) = tmp_store();
834        assert_eq!(store.data_size_hint(), Some(0));
835
836        let blobs = rng_blobs(2);
837        store.insert_all(blobs).unwrap();
838        assert!(store.data_size_hint().unwrap() > 0);
839    }
840
841    #[test]
842    fn disk_cleanup_stat() {
843        let (store, _dir) = tmp_store();
844
845        let blobs = rng_blobs(3);
846        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
847        store.insert_all(blobs).unwrap();
848
849        store.delete_all(txs).unwrap();
850        let stat = store.cleanup();
851        assert_eq!(stat.delete_succeed, 3);
852        assert_eq!(stat.delete_failed, 0);
853    }
854}