reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2},
6    eip7594::BlobTransactionSidecarVariant,
7    eip7840::BlobParams,
8    merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
17pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19/// A cache size heuristic based on the highest blob params
20///
21/// This uses the max blobs per tx and max blobs per block over 16 epochs: `21 * 6 * 512 = 64512`
22/// This should be ~4MB
23const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24    BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26/// A blob store that stores blob data on disk.
27///
28/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
29/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
30/// blobs from disk.
31#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33    inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37    /// Opens and initializes a new disk file blob store according to the given options.
38    pub fn open(
39        blob_dir: impl Into<PathBuf>,
40        opts: DiskFileBlobStoreConfig,
41    ) -> Result<Self, DiskFileBlobStoreError> {
42        let blob_dir = blob_dir.into();
43        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46        // initialize the blob store
47        inner.delete_all()?;
48        inner.create_blob_dir()?;
49
50        Ok(Self { inner: Arc::new(inner) })
51    }
52
53    #[cfg(test)]
54    fn is_cached(&self, tx: &B256) -> bool {
55        self.inner.blob_cache.lock().get(tx).is_some()
56    }
57
58    #[cfg(test)]
59    fn clear_cache(&self) {
60        self.inner.blob_cache.lock().clear()
61    }
62
63    /// Look up EIP-7594 blobs by their versioned hashes.
64    ///
65    /// This returns a result vector with the **same length and order** as the input
66    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
67    /// `None` if it is missing or an older sidecar version.
68    ///
69    /// The lookup first scans the in-memory cache and, if not all blobs are found, falls back to
70    /// reading candidate sidecars from disk using the `versioned_hash -> tx_hash` index.
71    fn get_by_versioned_hashes_eip7594(
72        &self,
73        versioned_hashes: &[B256],
74    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75        // we must return the blobs in order but we don't necessarily find them in the requested
76        // order
77        let mut result = vec![None; versioned_hashes.len()];
78        let mut missing_count = result.len();
79        // first scan all cached full sidecars
80        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82                for (hash_idx, match_result) in
83                    blob_sidecar.match_versioned_hashes(versioned_hashes)
84                {
85                    result[hash_idx] = Some(match_result);
86                    missing_count -= 1;
87                }
88            }
89
90            // return early if all blobs are found.
91            if missing_count == 0 {
92                // since versioned_hashes may have duplicates, we double check here
93                if result.iter().all(|blob| blob.is_some()) {
94                    return Ok(result);
95                }
96            }
97        }
98
99        // not all versioned hashes were found, try to look up a matching tx
100        let mut missing_tx_hashes = Vec::new();
101
102        {
103            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
104            for (idx, _) in
105                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
106            {
107                // this is safe because the result vec has the same len
108                let versioned_hash = versioned_hashes[idx];
109                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
110                    missing_tx_hashes.push(tx_hash);
111                }
112            }
113        }
114
115        // if we have missing blobs, try to read them from disk and try again
116        if !missing_tx_hashes.is_empty() {
117            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
118            for (_, blob_sidecar) in blobs_from_disk {
119                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
120                    for (hash_idx, match_result) in
121                        blob_sidecar.match_versioned_hashes(versioned_hashes)
122                    {
123                        if result[hash_idx].is_none() {
124                            result[hash_idx] = Some(match_result);
125                        }
126                    }
127                }
128            }
129        }
130
131        Ok(result)
132    }
133}
134
135impl BlobStore for DiskFileBlobStore {
136    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
137        self.inner.insert_one(tx, data)
138    }
139
140    fn insert_all(
141        &self,
142        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
143    ) -> Result<(), BlobStoreError> {
144        if txs.is_empty() {
145            return Ok(())
146        }
147        self.inner.insert_many(txs)
148    }
149
150    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
151        if self.inner.contains(tx)? {
152            self.inner.txs_to_delete.write().insert(tx);
153        }
154        Ok(())
155    }
156
157    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
158        if txs.is_empty() {
159            return Ok(())
160        }
161        let txs = self.inner.retain_existing(txs)?;
162        self.inner.txs_to_delete.write().extend(txs);
163        Ok(())
164    }
165
166    fn cleanup(&self) -> BlobStoreCleanupStat {
167        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
168        let mut stat = BlobStoreCleanupStat::default();
169        let mut subsize = 0;
170        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
171        for tx in txs_to_delete {
172            let path = self.inner.blob_disk_file(tx);
173            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
174            match fs::remove_file(&path) {
175                Ok(_) => {
176                    stat.delete_succeed += 1;
177                    subsize += filesize;
178                }
179                Err(e) => {
180                    stat.delete_failed += 1;
181                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
182                    debug!(target:"txpool::blob", %err);
183                }
184            };
185        }
186        self.inner.size_tracker.sub_size(subsize as usize);
187        self.inner.size_tracker.sub_len(stat.delete_succeed);
188        stat
189    }
190
191    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
192        self.inner.get_one(tx)
193    }
194
195    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
196        self.inner.contains(tx)
197    }
198
199    fn get_all(
200        &self,
201        txs: Vec<B256>,
202    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
203        if txs.is_empty() {
204            return Ok(Vec::new())
205        }
206        self.inner.get_all(txs)
207    }
208
209    fn get_exact(
210        &self,
211        txs: Vec<B256>,
212    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
213        if txs.is_empty() {
214            return Ok(Vec::new())
215        }
216        self.inner.get_exact(txs)
217    }
218
219    fn get_by_versioned_hashes_v1(
220        &self,
221        versioned_hashes: &[B256],
222    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
223        // the response must always be the same len as the request, misses must be None
224        let mut result = vec![None; versioned_hashes.len()];
225
226        // first scan all cached full sidecars
227        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
228            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
229                for (hash_idx, match_result) in
230                    blob_sidecar.match_versioned_hashes(versioned_hashes)
231                {
232                    result[hash_idx] = Some(match_result);
233                }
234            }
235
236            // return early if all blobs are found.
237            if result.iter().all(|blob| blob.is_some()) {
238                return Ok(result);
239            }
240        }
241
242        // not all versioned hashes were be found, try to look up a matching tx
243
244        let mut missing_tx_hashes = Vec::new();
245
246        {
247            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
248            for (idx, _) in
249                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
250            {
251                // this is safe because the result vec has the same len
252                let versioned_hash = versioned_hashes[idx];
253                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
254                    missing_tx_hashes.push(tx_hash);
255                }
256            }
257        }
258
259        // if we have missing blobs, try to read them from disk and try again
260        if !missing_tx_hashes.is_empty() {
261            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
262            for (_, blob_sidecar) in blobs_from_disk {
263                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
264                    for (hash_idx, match_result) in
265                        blob_sidecar.match_versioned_hashes(versioned_hashes)
266                    {
267                        if result[hash_idx].is_none() {
268                            result[hash_idx] = Some(match_result);
269                        }
270                    }
271                }
272            }
273        }
274
275        Ok(result)
276    }
277
278    fn get_by_versioned_hashes_v2(
279        &self,
280        versioned_hashes: &[B256],
281    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
282        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
283
284        // only return the blobs if we found all requested versioned hashes
285        if result.iter().all(|blob| blob.is_some()) {
286            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
287        } else {
288            Ok(None)
289        }
290    }
291
292    fn get_by_versioned_hashes_v3(
293        &self,
294        versioned_hashes: &[B256],
295    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
296        self.get_by_versioned_hashes_eip7594(versioned_hashes)
297    }
298
299    fn data_size_hint(&self) -> Option<usize> {
300        Some(self.inner.size_tracker.data_size())
301    }
302
303    fn blobs_len(&self) -> usize {
304        self.inner.size_tracker.blobs_len()
305    }
306}
307
308struct DiskFileBlobStoreInner {
309    blob_dir: PathBuf,
310    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
311    size_tracker: BlobStoreSize,
312    file_lock: RwLock<()>,
313    txs_to_delete: RwLock<HashSet<B256>>,
314    /// Tracks of known versioned hashes and a transaction they exist in
315    ///
316    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
317    /// the most recent one.
318    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
319}
320
321impl DiskFileBlobStoreInner {
322    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
323    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
324        Self {
325            blob_dir,
326            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
327            size_tracker: Default::default(),
328            file_lock: Default::default(),
329            txs_to_delete: Default::default(),
330            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
331                VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
332            ))),
333        }
334    }
335
336    /// Creates the directory where blobs will be stored on disk.
337    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
338        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
339        fs::create_dir_all(&self.blob_dir)
340            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
341    }
342
343    /// Deletes the entire blob store.
344    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
345        match fs::remove_dir_all(&self.blob_dir) {
346            Ok(_) => {
347                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
348            }
349            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
350            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
351        }
352        Ok(())
353    }
354
355    /// Ensures blob is in the blob cache and written to the disk.
356    fn insert_one(
357        &self,
358        tx: B256,
359        data: BlobTransactionSidecarVariant,
360    ) -> Result<(), BlobStoreError> {
361        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
362        data.rlp_encode_fields(&mut buf);
363
364        {
365            // cache the versioned hashes to tx hash
366            let mut map = self.versioned_hashes_to_txhash.lock();
367            data.versioned_hashes().for_each(|hash| {
368                map.insert(hash, tx);
369            });
370        }
371
372        self.blob_cache.lock().insert(tx, Arc::new(data));
373
374        let size = self.write_one_encoded(tx, &buf)?;
375
376        self.size_tracker.add_size(size);
377        self.size_tracker.inc_len(1);
378        Ok(())
379    }
380
381    /// Ensures blobs are in the blob cache and written to the disk.
382    fn insert_many(
383        &self,
384        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
385    ) -> Result<(), BlobStoreError> {
386        let raw = txs
387            .iter()
388            .map(|(tx, data)| {
389                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
390                data.rlp_encode_fields(&mut buf);
391                (self.blob_disk_file(*tx), buf)
392            })
393            .collect::<Vec<_>>();
394
395        {
396            // cache versioned hashes to tx hash
397            let mut map = self.versioned_hashes_to_txhash.lock();
398            for (tx, data) in &txs {
399                data.versioned_hashes().for_each(|hash| {
400                    map.insert(hash, *tx);
401                });
402            }
403        }
404
405        {
406            // cache blobs
407            let mut cache = self.blob_cache.lock();
408            for (tx, data) in txs {
409                cache.insert(tx, Arc::new(data));
410            }
411        }
412
413        let mut add = 0;
414        let mut num = 0;
415        {
416            let _lock = self.file_lock.write();
417            for (path, data) in raw {
418                if path.exists() {
419                    debug!(target:"txpool::blob", ?path, "Blob already exists");
420                } else if let Err(err) = fs::write(&path, &data) {
421                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
422                } else {
423                    add += data.len();
424                    num += 1;
425                }
426            }
427        }
428        self.size_tracker.add_size(add);
429        self.size_tracker.inc_len(num);
430
431        Ok(())
432    }
433
434    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
435    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
436        if self.blob_cache.lock().get(&tx).is_some() {
437            return Ok(true)
438        }
439        // we only check if the file exists and assume it's valid
440        Ok(self.blob_disk_file(tx).is_file())
441    }
442
443    /// Returns all the blob transactions which are in the cache or on the disk.
444    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
445        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
446            let mut cache = self.blob_cache.lock();
447            txs.into_iter().partition(|tx| cache.get(tx).is_some())
448        };
449
450        let mut existing = in_cache;
451        for tx in not_in_cache {
452            if self.blob_disk_file(tx).is_file() {
453                existing.push(tx);
454            }
455        }
456
457        Ok(existing)
458    }
459
460    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
461    fn get_one(
462        &self,
463        tx: B256,
464    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
465        if let Some(blob) = self.blob_cache.lock().get(&tx) {
466            return Ok(Some(blob.clone()))
467        }
468
469        if let Some(blob) = self.read_one(tx)? {
470            let blob_arc = Arc::new(blob);
471            self.blob_cache.lock().insert(tx, blob_arc.clone());
472            return Ok(Some(blob_arc))
473        }
474
475        Ok(None)
476    }
477
478    /// Returns the path to the blob file for the given transaction hash.
479    #[inline]
480    fn blob_disk_file(&self, tx: B256) -> PathBuf {
481        self.blob_dir.join(format!("{tx:x}"))
482    }
483
484    /// Retrieves the blob data for the given transaction hash.
485    #[inline]
486    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
487        let path = self.blob_disk_file(tx);
488        let data = {
489            let _lock = self.file_lock.read();
490            match fs::read(&path) {
491                Ok(data) => data,
492                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
493                Err(e) => {
494                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
495                        tx, path, e,
496                    ))))
497                }
498            }
499        };
500        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
501            .map(Some)
502            .map_err(BlobStoreError::DecodeError)
503    }
504
505    /// Returns decoded blobs read from disk.
506    ///
507    /// Only returns sidecars that were found and successfully decoded.
508    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
509        self.read_many_raw(txs)
510            .into_iter()
511            .filter_map(|(tx, data)| {
512                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
513                    .map(|sidecar| (tx, sidecar))
514                    .ok()
515            })
516            .collect()
517    }
518
519    /// Retrieves the raw blob data for the given transaction hashes.
520    ///
521    /// Only returns the blobs that were found in file.
522    #[inline]
523    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
524        let mut res = Vec::with_capacity(txs.len());
525        let _lock = self.file_lock.read();
526        for tx in txs {
527            let path = self.blob_disk_file(tx);
528            match fs::read(&path) {
529                Ok(data) => {
530                    res.push((tx, data));
531                }
532                Err(err) => {
533                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
534                }
535            };
536        }
537        res
538    }
539
540    /// Writes the blob data for the given transaction hash to the disk.
541    #[inline]
542    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
543        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
544        let mut add = 0;
545        let path = self.blob_disk_file(tx);
546        {
547            let _lock = self.file_lock.write();
548            if !path.exists() {
549                fs::write(&path, data)
550                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
551                add = data.len();
552            }
553        }
554        Ok(add)
555    }
556
557    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
558    ///
559    /// This will not return an error if there are missing blobs. Therefore, the result may be a
560    /// subset of the request or an empty vector if none of the blobs were found.
561    #[inline]
562    fn get_all(
563        &self,
564        txs: Vec<B256>,
565    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
566        let mut res = Vec::with_capacity(txs.len());
567        let mut cache_miss = Vec::new();
568        {
569            let mut cache = self.blob_cache.lock();
570            for tx in txs {
571                if let Some(blob) = cache.get(&tx) {
572                    res.push((tx, blob.clone()));
573                } else {
574                    cache_miss.push(tx)
575                }
576            }
577        }
578        if cache_miss.is_empty() {
579            return Ok(res)
580        }
581        let from_disk = self.read_many_decoded(cache_miss);
582        if from_disk.is_empty() {
583            return Ok(res)
584        }
585        let from_disk = from_disk
586            .into_iter()
587            .map(|(tx, data)| {
588                let data = Arc::new(data);
589                res.push((tx, data.clone()));
590                (tx, data)
591            })
592            .collect::<Vec<_>>();
593
594        let mut cache = self.blob_cache.lock();
595        for (tx, data) in from_disk {
596            cache.insert(tx, data);
597        }
598
599        Ok(res)
600    }
601
602    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
603    ///
604    /// Returns an error if there are any missing blobs.
605    #[inline]
606    fn get_exact(
607        &self,
608        txs: Vec<B256>,
609    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
610        txs.into_iter()
611            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
612            .collect()
613    }
614}
615
616impl fmt::Debug for DiskFileBlobStoreInner {
617    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618        f.debug_struct("DiskFileBlobStoreInner")
619            .field("blob_dir", &self.blob_dir)
620            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
621            .field("txs_to_delete", &self.txs_to_delete.try_read())
622            .finish()
623    }
624}
625
626/// Errors that can occur when interacting with a disk file blob store.
627#[derive(Debug, thiserror::Error)]
628pub enum DiskFileBlobStoreError {
629    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
630    #[error("failed to open blobstore at {0}: {1}")]
631    /// Indicates a failure to open the blob store directory.
632    Open(PathBuf, io::Error),
633    /// Failure while reading a blob file.
634    #[error("[{0}] failed to read blob file at {1}: {2}")]
635    /// Indicates a failure while reading a blob file.
636    ReadFile(TxHash, PathBuf, io::Error),
637    /// Failure while writing a blob file.
638    #[error("[{0}] failed to write blob file at {1}: {2}")]
639    /// Indicates a failure while writing a blob file.
640    WriteFile(TxHash, PathBuf, io::Error),
641    /// Failure while deleting a blob file.
642    #[error("[{0}] failed to delete blob file at {1}: {2}")]
643    /// Indicates a failure while deleting a blob file.
644    DeleteFile(TxHash, PathBuf, io::Error),
645}
646
647impl From<DiskFileBlobStoreError> for BlobStoreError {
648    fn from(value: DiskFileBlobStoreError) -> Self {
649        Self::Other(Box::new(value))
650    }
651}
652
653/// Configuration for a disk file blob store.
654#[derive(Debug, Clone)]
655pub struct DiskFileBlobStoreConfig {
656    /// The maximum number of blobs to keep in the in memory blob cache.
657    pub max_cached_entries: u32,
658    /// How to open the blob store.
659    pub open: OpenDiskFileBlobStore,
660}
661
662impl Default for DiskFileBlobStoreConfig {
663    fn default() -> Self {
664        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
665    }
666}
667
668impl DiskFileBlobStoreConfig {
669    /// Set maximum number of blobs to keep in the in memory blob cache.
670    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
671        self.max_cached_entries = max_cached_entries;
672        self
673    }
674}
675
676/// How to open a disk file blob store.
677#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
678pub enum OpenDiskFileBlobStore {
679    /// Clear everything in the blob store.
680    #[default]
681    Clear,
682    /// Keep the existing blob store and index
683    ReIndex,
684}
685
686#[cfg(test)]
687mod tests {
688    use alloy_consensus::BlobTransactionSidecar;
689    use alloy_eips::{
690        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
691        eip7594::{
692            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
693        },
694    };
695
696    use super::*;
697    use std::sync::atomic::Ordering;
698
699    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
700        let dir = tempfile::tempdir().unwrap();
701        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
702        (store, dir)
703    }
704
705    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
706        let mut rng = rand::rng();
707        (0..num)
708            .map(|_| {
709                let tx = TxHash::random_with(&mut rng);
710                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
711                    blobs: vec![],
712                    commitments: vec![],
713                    proofs: vec![],
714                });
715                (tx, blob)
716            })
717            .collect()
718    }
719
720    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
721        let blob = Blob::default();
722        let commitment = Bytes48::default();
723        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
724
725        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
726
727        let expected =
728            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
729        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
730
731        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
732    }
733
734    #[test]
735    fn disk_insert_all_get_all() {
736        let (store, _dir) = tmp_store();
737
738        let blobs = rng_blobs(10);
739        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
740        store.insert_all(blobs.clone()).unwrap();
741
742        // all cached
743        for (tx, blob) in &blobs {
744            assert!(store.is_cached(tx));
745            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
746            assert_eq!(b, *blob);
747        }
748
749        let all = store.get_all(all_hashes.clone()).unwrap();
750        for (tx, blob) in all {
751            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
752        }
753
754        assert!(store.contains(all_hashes[0]).unwrap());
755        store.delete_all(all_hashes.clone()).unwrap();
756        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
757        store.clear_cache();
758        store.cleanup();
759
760        assert!(store.get(blobs[0].0).unwrap().is_none());
761
762        let all = store.get_all(all_hashes.clone()).unwrap();
763        assert!(all.is_empty());
764
765        assert!(!store.contains(all_hashes[0]).unwrap());
766        assert!(store.get_exact(all_hashes).is_err());
767
768        assert_eq!(store.data_size_hint(), Some(0));
769        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
770    }
771
772    #[test]
773    fn disk_insert_and_retrieve() {
774        let (store, _dir) = tmp_store();
775
776        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
777        store.insert(tx, blob.clone()).unwrap();
778
779        assert!(store.is_cached(&tx));
780        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
781        assert_eq!(retrieved_blob, blob);
782    }
783
784    #[test]
785    fn disk_delete_blob() {
786        let (store, _dir) = tmp_store();
787
788        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
789        store.insert(tx, blob).unwrap();
790        assert!(store.is_cached(&tx));
791
792        store.delete(tx).unwrap();
793        assert!(store.inner.txs_to_delete.read().contains(&tx));
794        store.cleanup();
795
796        let result = store.get(tx).unwrap();
797        assert_eq!(
798            result,
799            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
800                blobs: vec![],
801                commitments: vec![],
802                proofs: vec![]
803            })))
804        );
805    }
806
807    #[test]
808    fn disk_insert_all_and_delete_all() {
809        let (store, _dir) = tmp_store();
810
811        let blobs = rng_blobs(5);
812        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
813        store.insert_all(blobs.clone()).unwrap();
814
815        for (tx, _) in &blobs {
816            assert!(store.is_cached(tx));
817        }
818
819        store.delete_all(txs.clone()).unwrap();
820        store.cleanup();
821
822        for tx in txs {
823            let result = store.get(tx).unwrap();
824            assert_eq!(
825                result,
826                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
827                    blobs: vec![],
828                    commitments: vec![],
829                    proofs: vec![]
830                })))
831            );
832        }
833    }
834
835    #[test]
836    fn disk_get_all_blobs() {
837        let (store, _dir) = tmp_store();
838
839        let blobs = rng_blobs(3);
840        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
841        store.insert_all(blobs.clone()).unwrap();
842
843        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
844        for (tx, blob) in retrieved_blobs {
845            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
846        }
847
848        store.delete_all(txs).unwrap();
849        store.cleanup();
850    }
851
852    #[test]
853    fn disk_get_exact_blobs_success() {
854        let (store, _dir) = tmp_store();
855
856        let blobs = rng_blobs(3);
857        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
858        store.insert_all(blobs.clone()).unwrap();
859
860        let retrieved_blobs = store.get_exact(txs).unwrap();
861        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
862            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
863        }
864    }
865
866    #[test]
867    fn disk_get_exact_blobs_failure() {
868        let (store, _dir) = tmp_store();
869
870        let blobs = rng_blobs(2);
871        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
872        store.insert_all(blobs).unwrap();
873
874        // Try to get a blob that was never inserted
875        let missing_tx = TxHash::random();
876        let result = store.get_exact(vec![txs[0], missing_tx]);
877        assert!(result.is_err());
878    }
879
880    #[test]
881    fn disk_data_size_hint() {
882        let (store, _dir) = tmp_store();
883        assert_eq!(store.data_size_hint(), Some(0));
884
885        let blobs = rng_blobs(2);
886        store.insert_all(blobs).unwrap();
887        assert!(store.data_size_hint().unwrap() > 0);
888    }
889
890    #[test]
891    fn disk_cleanup_stat() {
892        let (store, _dir) = tmp_store();
893
894        let blobs = rng_blobs(3);
895        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
896        store.insert_all(blobs).unwrap();
897
898        store.delete_all(txs).unwrap();
899        let stat = store.cleanup();
900        assert_eq!(stat.delete_succeed, 3);
901        assert_eq!(stat.delete_failed, 0);
902    }
903
904    #[test]
905    fn disk_get_blobs_v3_returns_partial_results() {
906        let (store, _dir) = tmp_store();
907
908        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
909        store.insert(TxHash::random(), sidecar).unwrap();
910
911        assert_ne!(versioned_hash, B256::ZERO);
912
913        let request = vec![versioned_hash, B256::ZERO];
914        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
915        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
916
917        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
918        assert_eq!(v3, vec![Some(expected), None]);
919    }
920
921    #[test]
922    fn disk_get_blobs_v3_can_fallback_to_disk() {
923        let (store, _dir) = tmp_store();
924
925        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
926        store.insert(TxHash::random(), sidecar).unwrap();
927        store.clear_cache();
928
929        let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
930        assert_eq!(v3, vec![Some(expected)]);
931    }
932}