Skip to main content

reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2},
6    eip7594::BlobTransactionSidecarVariant,
7    eip7840::BlobParams,
8    merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
17pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19/// A cache size heuristic based on the highest blob params
20///
21/// This uses the max blobs per tx and max blobs per block over 16 epochs: `21 * 6 * 512 = 64512`
22/// This should be ~4MB
23const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24    BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26/// A blob store that stores blob data on disk.
27///
28/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
29/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
30/// blobs from disk.
31#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33    inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37    /// Opens and initializes a new disk file blob store according to the given options.
38    pub fn open(
39        blob_dir: impl Into<PathBuf>,
40        opts: DiskFileBlobStoreConfig,
41    ) -> Result<Self, DiskFileBlobStoreError> {
42        let blob_dir = blob_dir.into();
43        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46        // initialize the blob store
47        inner.delete_all()?;
48        inner.create_blob_dir()?;
49
50        Ok(Self { inner: Arc::new(inner) })
51    }
52
53    #[cfg(test)]
54    fn is_cached(&self, tx: &B256) -> bool {
55        self.inner.blob_cache.lock().get(tx).is_some()
56    }
57
58    #[cfg(test)]
59    fn clear_cache(&self) {
60        self.inner.blob_cache.lock().clear()
61    }
62
63    /// Look up EIP-7594 blobs by their versioned hashes.
64    ///
65    /// This returns a result vector with the **same length and order** as the input
66    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
67    /// `None` if it is missing or an older sidecar version.
68    ///
69    /// The lookup first scans the in-memory cache and, if not all blobs are found, falls back to
70    /// reading candidate sidecars from disk using the `versioned_hash -> tx_hash` index.
71    fn get_by_versioned_hashes_eip7594(
72        &self,
73        versioned_hashes: &[B256],
74    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75        // we must return the blobs in order but we don't necessarily find them in the requested
76        // order
77        let mut result = vec![None; versioned_hashes.len()];
78        let mut missing_count = result.len();
79        // first scan all cached full sidecars
80        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82                for (hash_idx, match_result) in
83                    blob_sidecar.match_versioned_hashes(versioned_hashes)
84                {
85                    let slot = &mut result[hash_idx];
86                    if slot.is_none() {
87                        missing_count -= 1;
88                    }
89                    *slot = Some(match_result);
90                }
91            }
92
93            // return early if all blobs are found.
94            if missing_count == 0 {
95                // since versioned_hashes may have duplicates, we double check here
96                if result.iter().all(|blob| blob.is_some()) {
97                    return Ok(result);
98                }
99            }
100        }
101
102        // not all versioned hashes were found, try to look up a matching tx
103        let mut missing_tx_hashes = Vec::new();
104
105        {
106            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107            for (idx, _) in
108                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109            {
110                // this is safe because the result vec has the same len
111                let versioned_hash = versioned_hashes[idx];
112                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113                    missing_tx_hashes.push(tx_hash);
114                }
115            }
116        }
117
118        // if we have missing blobs, try to read them from disk and try again
119        if !missing_tx_hashes.is_empty() {
120            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121            for (_, blob_sidecar) in blobs_from_disk {
122                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123                    for (hash_idx, match_result) in
124                        blob_sidecar.match_versioned_hashes(versioned_hashes)
125                    {
126                        if result[hash_idx].is_none() {
127                            result[hash_idx] = Some(match_result);
128                        }
129                    }
130                }
131            }
132        }
133
134        Ok(result)
135    }
136}
137
138impl BlobStore for DiskFileBlobStore {
139    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
140        self.inner.insert_one(tx, data)
141    }
142
143    fn insert_all(
144        &self,
145        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
146    ) -> Result<(), BlobStoreError> {
147        if txs.is_empty() {
148            return Ok(())
149        }
150        self.inner.insert_many(txs)
151    }
152
153    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
154        if self.inner.contains(tx)? {
155            self.inner.txs_to_delete.write().insert(tx);
156        }
157        Ok(())
158    }
159
160    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
161        if txs.is_empty() {
162            return Ok(())
163        }
164        let txs = self.inner.retain_existing(txs)?;
165        self.inner.txs_to_delete.write().extend(txs);
166        Ok(())
167    }
168
169    fn cleanup(&self) -> BlobStoreCleanupStat {
170        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
171        let mut stat = BlobStoreCleanupStat::default();
172        let mut subsize = 0;
173        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
174        for tx in txs_to_delete {
175            let path = self.inner.blob_disk_file(tx);
176            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
177            match fs::remove_file(&path) {
178                Ok(_) => {
179                    stat.delete_succeed += 1;
180                    subsize += filesize;
181                }
182                Err(e) => {
183                    stat.delete_failed += 1;
184                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
185                    debug!(target:"txpool::blob", %err);
186                }
187            };
188        }
189        self.inner.size_tracker.sub_size(subsize as usize);
190        self.inner.size_tracker.sub_len(stat.delete_succeed);
191        stat
192    }
193
194    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
195        self.inner.get_one(tx)
196    }
197
198    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
199        self.inner.contains(tx)
200    }
201
202    fn get_all(
203        &self,
204        txs: Vec<B256>,
205    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
206        if txs.is_empty() {
207            return Ok(Vec::new())
208        }
209        self.inner.get_all(txs)
210    }
211
212    fn get_exact(
213        &self,
214        txs: Vec<B256>,
215    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
216        if txs.is_empty() {
217            return Ok(Vec::new())
218        }
219        self.inner.get_exact(txs)
220    }
221
222    fn get_by_versioned_hashes_v1(
223        &self,
224        versioned_hashes: &[B256],
225    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
226        // the response must always be the same len as the request, misses must be None
227        let mut result = vec![None; versioned_hashes.len()];
228
229        // first scan all cached full sidecars
230        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
231            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
232                for (hash_idx, match_result) in
233                    blob_sidecar.match_versioned_hashes(versioned_hashes)
234                {
235                    result[hash_idx] = Some(match_result);
236                }
237            }
238
239            // return early if all blobs are found.
240            if result.iter().all(|blob| blob.is_some()) {
241                return Ok(result);
242            }
243        }
244
245        // not all versioned hashes were be found, try to look up a matching tx
246
247        let mut missing_tx_hashes = Vec::new();
248
249        {
250            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
251            for (idx, _) in
252                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
253            {
254                // this is safe because the result vec has the same len
255                let versioned_hash = versioned_hashes[idx];
256                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
257                    missing_tx_hashes.push(tx_hash);
258                }
259            }
260        }
261
262        // if we have missing blobs, try to read them from disk and try again
263        if !missing_tx_hashes.is_empty() {
264            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
265            for (_, blob_sidecar) in blobs_from_disk {
266                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
267                    for (hash_idx, match_result) in
268                        blob_sidecar.match_versioned_hashes(versioned_hashes)
269                    {
270                        if result[hash_idx].is_none() {
271                            result[hash_idx] = Some(match_result);
272                        }
273                    }
274                }
275            }
276        }
277
278        Ok(result)
279    }
280
281    fn get_by_versioned_hashes_v2(
282        &self,
283        versioned_hashes: &[B256],
284    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
285        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
286
287        // only return the blobs if we found all requested versioned hashes
288        if result.iter().all(|blob| blob.is_some()) {
289            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
290        } else {
291            Ok(None)
292        }
293    }
294
295    fn get_by_versioned_hashes_v3(
296        &self,
297        versioned_hashes: &[B256],
298    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
299        self.get_by_versioned_hashes_eip7594(versioned_hashes)
300    }
301
302    fn data_size_hint(&self) -> Option<usize> {
303        Some(self.inner.size_tracker.data_size())
304    }
305
306    fn blobs_len(&self) -> usize {
307        self.inner.size_tracker.blobs_len()
308    }
309}
310
311struct DiskFileBlobStoreInner {
312    blob_dir: PathBuf,
313    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
314    size_tracker: BlobStoreSize,
315    file_lock: RwLock<()>,
316    txs_to_delete: RwLock<B256Set>,
317    /// Tracks of known versioned hashes and a transaction they exist in
318    ///
319    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
320    /// the most recent one.
321    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
322}
323
324impl DiskFileBlobStoreInner {
325    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
326    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
327        Self {
328            blob_dir,
329            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
330            size_tracker: Default::default(),
331            file_lock: Default::default(),
332            txs_to_delete: Default::default(),
333            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
334                VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
335            ))),
336        }
337    }
338
339    /// Creates the directory where blobs will be stored on disk.
340    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
341        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
342        fs::create_dir_all(&self.blob_dir)
343            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
344    }
345
346    /// Deletes the entire blob store.
347    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
348        match fs::remove_dir_all(&self.blob_dir) {
349            Ok(_) => {
350                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
351            }
352            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
353            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
354        }
355        Ok(())
356    }
357
358    /// Ensures blob is in the blob cache and written to the disk.
359    fn insert_one(
360        &self,
361        tx: B256,
362        data: BlobTransactionSidecarVariant,
363    ) -> Result<(), BlobStoreError> {
364        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
365        data.rlp_encode_fields(&mut buf);
366
367        {
368            // cache the versioned hashes to tx hash
369            let mut map = self.versioned_hashes_to_txhash.lock();
370            data.versioned_hashes().for_each(|hash| {
371                map.insert(hash, tx);
372            });
373        }
374
375        self.blob_cache.lock().insert(tx, Arc::new(data));
376
377        let size = self.write_one_encoded(tx, &buf)?;
378
379        self.size_tracker.add_size(size);
380        self.size_tracker.inc_len(1);
381        Ok(())
382    }
383
384    /// Ensures blobs are in the blob cache and written to the disk.
385    fn insert_many(
386        &self,
387        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
388    ) -> Result<(), BlobStoreError> {
389        let raw = txs
390            .iter()
391            .map(|(tx, data)| {
392                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
393                data.rlp_encode_fields(&mut buf);
394                (self.blob_disk_file(*tx), buf)
395            })
396            .collect::<Vec<_>>();
397
398        {
399            // cache versioned hashes to tx hash
400            let mut map = self.versioned_hashes_to_txhash.lock();
401            for (tx, data) in &txs {
402                data.versioned_hashes().for_each(|hash| {
403                    map.insert(hash, *tx);
404                });
405            }
406        }
407
408        {
409            // cache blobs
410            let mut cache = self.blob_cache.lock();
411            for (tx, data) in txs {
412                cache.insert(tx, Arc::new(data));
413            }
414        }
415
416        let mut add = 0;
417        let mut num = 0;
418        {
419            let _lock = self.file_lock.write();
420            for (path, data) in raw {
421                if path.exists() {
422                    debug!(target:"txpool::blob", ?path, "Blob already exists");
423                } else if let Err(err) = fs::write(&path, &data) {
424                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
425                } else {
426                    add += data.len();
427                    num += 1;
428                }
429            }
430        }
431        self.size_tracker.add_size(add);
432        self.size_tracker.inc_len(num);
433
434        Ok(())
435    }
436
437    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
438    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
439        if self.blob_cache.lock().get(&tx).is_some() {
440            return Ok(true)
441        }
442        // we only check if the file exists and assume it's valid
443        Ok(self.blob_disk_file(tx).is_file())
444    }
445
446    /// Returns all the blob transactions which are in the cache or on the disk.
447    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
448        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
449            let mut cache = self.blob_cache.lock();
450            txs.into_iter().partition(|tx| cache.get(tx).is_some())
451        };
452
453        let mut existing = in_cache;
454        for tx in not_in_cache {
455            if self.blob_disk_file(tx).is_file() {
456                existing.push(tx);
457            }
458        }
459
460        Ok(existing)
461    }
462
463    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
464    fn get_one(
465        &self,
466        tx: B256,
467    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
468        if let Some(blob) = self.blob_cache.lock().get(&tx) {
469            return Ok(Some(blob.clone()))
470        }
471
472        if let Some(blob) = self.read_one(tx)? {
473            let blob_arc = Arc::new(blob);
474            self.blob_cache.lock().insert(tx, blob_arc.clone());
475            return Ok(Some(blob_arc))
476        }
477
478        Ok(None)
479    }
480
481    /// Returns the path to the blob file for the given transaction hash.
482    #[inline]
483    fn blob_disk_file(&self, tx: B256) -> PathBuf {
484        self.blob_dir.join(format!("{tx:x}"))
485    }
486
487    /// Retrieves the blob data for the given transaction hash.
488    #[inline]
489    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
490        let path = self.blob_disk_file(tx);
491        let data = {
492            let _lock = self.file_lock.read();
493            match fs::read(&path) {
494                Ok(data) => data,
495                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
496                Err(e) => {
497                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
498                        tx, path, e,
499                    ))))
500                }
501            }
502        };
503        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
504            .map(Some)
505            .map_err(BlobStoreError::DecodeError)
506    }
507
508    /// Returns decoded blobs read from disk.
509    ///
510    /// Only returns sidecars that were found and successfully decoded.
511    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
512        self.read_many_raw(txs)
513            .into_iter()
514            .filter_map(|(tx, data)| {
515                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
516                    .map(|sidecar| (tx, sidecar))
517                    .ok()
518            })
519            .collect()
520    }
521
522    /// Retrieves the raw blob data for the given transaction hashes.
523    ///
524    /// Only returns the blobs that were found in file.
525    #[inline]
526    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
527        let mut res = Vec::with_capacity(txs.len());
528        let _lock = self.file_lock.read();
529        for tx in txs {
530            let path = self.blob_disk_file(tx);
531            match fs::read(&path) {
532                Ok(data) => {
533                    res.push((tx, data));
534                }
535                Err(err) => {
536                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
537                }
538            };
539        }
540        res
541    }
542
543    /// Writes the blob data for the given transaction hash to the disk.
544    #[inline]
545    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
546        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
547        let mut add = 0;
548        let path = self.blob_disk_file(tx);
549        {
550            let _lock = self.file_lock.write();
551            if !path.exists() {
552                fs::write(&path, data)
553                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
554                add = data.len();
555            }
556        }
557        Ok(add)
558    }
559
560    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
561    ///
562    /// This will not return an error if there are missing blobs. Therefore, the result may be a
563    /// subset of the request or an empty vector if none of the blobs were found.
564    #[inline]
565    fn get_all(
566        &self,
567        txs: Vec<B256>,
568    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
569        let mut res = Vec::with_capacity(txs.len());
570        let mut cache_miss = Vec::new();
571        {
572            let mut cache = self.blob_cache.lock();
573            for tx in txs {
574                if let Some(blob) = cache.get(&tx) {
575                    res.push((tx, blob.clone()));
576                } else {
577                    cache_miss.push(tx)
578                }
579            }
580        }
581        if cache_miss.is_empty() {
582            return Ok(res)
583        }
584        let from_disk = self.read_many_decoded(cache_miss);
585        if from_disk.is_empty() {
586            return Ok(res)
587        }
588        let from_disk = from_disk
589            .into_iter()
590            .map(|(tx, data)| {
591                let data = Arc::new(data);
592                res.push((tx, data.clone()));
593                (tx, data)
594            })
595            .collect::<Vec<_>>();
596
597        let mut cache = self.blob_cache.lock();
598        for (tx, data) in from_disk {
599            cache.insert(tx, data);
600        }
601
602        Ok(res)
603    }
604
605    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
606    ///
607    /// Returns an error if there are any missing blobs.
608    #[inline]
609    fn get_exact(
610        &self,
611        txs: Vec<B256>,
612    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
613        txs.into_iter()
614            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
615            .collect()
616    }
617}
618
619impl fmt::Debug for DiskFileBlobStoreInner {
620    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621        f.debug_struct("DiskFileBlobStoreInner")
622            .field("blob_dir", &self.blob_dir)
623            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
624            .field("txs_to_delete", &self.txs_to_delete.try_read())
625            .finish()
626    }
627}
628
629/// Errors that can occur when interacting with a disk file blob store.
630#[derive(Debug, thiserror::Error)]
631pub enum DiskFileBlobStoreError {
632    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
633    #[error("failed to open blobstore at {0}: {1}")]
634    /// Indicates a failure to open the blob store directory.
635    Open(PathBuf, io::Error),
636    /// Failure while reading a blob file.
637    #[error("[{0}] failed to read blob file at {1}: {2}")]
638    /// Indicates a failure while reading a blob file.
639    ReadFile(TxHash, PathBuf, io::Error),
640    /// Failure while writing a blob file.
641    #[error("[{0}] failed to write blob file at {1}: {2}")]
642    /// Indicates a failure while writing a blob file.
643    WriteFile(TxHash, PathBuf, io::Error),
644    /// Failure while deleting a blob file.
645    #[error("[{0}] failed to delete blob file at {1}: {2}")]
646    /// Indicates a failure while deleting a blob file.
647    DeleteFile(TxHash, PathBuf, io::Error),
648}
649
650impl From<DiskFileBlobStoreError> for BlobStoreError {
651    fn from(value: DiskFileBlobStoreError) -> Self {
652        Self::Other(Box::new(value))
653    }
654}
655
656/// Configuration for a disk file blob store.
657#[derive(Debug, Clone)]
658pub struct DiskFileBlobStoreConfig {
659    /// The maximum number of blobs to keep in the in memory blob cache.
660    pub max_cached_entries: u32,
661    /// How to open the blob store.
662    pub open: OpenDiskFileBlobStore,
663}
664
665impl Default for DiskFileBlobStoreConfig {
666    fn default() -> Self {
667        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
668    }
669}
670
671impl DiskFileBlobStoreConfig {
672    /// Set maximum number of blobs to keep in the in memory blob cache.
673    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
674        self.max_cached_entries = max_cached_entries;
675        self
676    }
677}
678
679/// How to open a disk file blob store.
680#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
681pub enum OpenDiskFileBlobStore {
682    /// Clear everything in the blob store.
683    #[default]
684    Clear,
685    /// Keep the existing blob store and index
686    ReIndex,
687}
688
689#[cfg(test)]
690mod tests {
691    use alloy_consensus::BlobTransactionSidecar;
692    use alloy_eips::{
693        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
694        eip7594::{
695            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
696        },
697    };
698
699    use super::*;
700    use std::sync::atomic::Ordering;
701
702    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
703        let dir = tempfile::tempdir().unwrap();
704        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
705        (store, dir)
706    }
707
708    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
709        let mut rng = rand::rng();
710        (0..num)
711            .map(|_| {
712                let tx = TxHash::random_with(&mut rng);
713                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
714                    blobs: vec![],
715                    commitments: vec![],
716                    proofs: vec![],
717                });
718                (tx, blob)
719            })
720            .collect()
721    }
722
723    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
724        let blob = Blob::default();
725        let commitment = Bytes48::default();
726        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
727
728        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
729
730        let expected =
731            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
732        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
733
734        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
735    }
736
737    #[test]
738    fn disk_insert_all_get_all() {
739        let (store, _dir) = tmp_store();
740
741        let blobs = rng_blobs(10);
742        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
743        store.insert_all(blobs.clone()).unwrap();
744
745        // all cached
746        for (tx, blob) in &blobs {
747            assert!(store.is_cached(tx));
748            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
749            assert_eq!(b, *blob);
750        }
751
752        let all = store.get_all(all_hashes.clone()).unwrap();
753        for (tx, blob) in all {
754            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
755        }
756
757        assert!(store.contains(all_hashes[0]).unwrap());
758        store.delete_all(all_hashes.clone()).unwrap();
759        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
760        store.clear_cache();
761        store.cleanup();
762
763        assert!(store.get(blobs[0].0).unwrap().is_none());
764
765        let all = store.get_all(all_hashes.clone()).unwrap();
766        assert!(all.is_empty());
767
768        assert!(!store.contains(all_hashes[0]).unwrap());
769        assert!(store.get_exact(all_hashes).is_err());
770
771        assert_eq!(store.data_size_hint(), Some(0));
772        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
773    }
774
775    #[test]
776    fn disk_insert_and_retrieve() {
777        let (store, _dir) = tmp_store();
778
779        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
780        store.insert(tx, blob.clone()).unwrap();
781
782        assert!(store.is_cached(&tx));
783        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
784        assert_eq!(retrieved_blob, blob);
785    }
786
787    #[test]
788    fn disk_delete_blob() {
789        let (store, _dir) = tmp_store();
790
791        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
792        store.insert(tx, blob).unwrap();
793        assert!(store.is_cached(&tx));
794
795        store.delete(tx).unwrap();
796        assert!(store.inner.txs_to_delete.read().contains(&tx));
797        store.cleanup();
798
799        let result = store.get(tx).unwrap();
800        assert_eq!(
801            result,
802            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
803                blobs: vec![],
804                commitments: vec![],
805                proofs: vec![]
806            })))
807        );
808    }
809
810    #[test]
811    fn disk_insert_all_and_delete_all() {
812        let (store, _dir) = tmp_store();
813
814        let blobs = rng_blobs(5);
815        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
816        store.insert_all(blobs.clone()).unwrap();
817
818        for (tx, _) in &blobs {
819            assert!(store.is_cached(tx));
820        }
821
822        store.delete_all(txs.clone()).unwrap();
823        store.cleanup();
824
825        for tx in txs {
826            let result = store.get(tx).unwrap();
827            assert_eq!(
828                result,
829                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
830                    blobs: vec![],
831                    commitments: vec![],
832                    proofs: vec![]
833                })))
834            );
835        }
836    }
837
838    #[test]
839    fn disk_get_all_blobs() {
840        let (store, _dir) = tmp_store();
841
842        let blobs = rng_blobs(3);
843        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
844        store.insert_all(blobs.clone()).unwrap();
845
846        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
847        for (tx, blob) in retrieved_blobs {
848            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
849        }
850
851        store.delete_all(txs).unwrap();
852        store.cleanup();
853    }
854
855    #[test]
856    fn disk_get_exact_blobs_success() {
857        let (store, _dir) = tmp_store();
858
859        let blobs = rng_blobs(3);
860        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
861        store.insert_all(blobs.clone()).unwrap();
862
863        let retrieved_blobs = store.get_exact(txs).unwrap();
864        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
865            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
866        }
867    }
868
869    #[test]
870    fn disk_get_exact_blobs_failure() {
871        let (store, _dir) = tmp_store();
872
873        let blobs = rng_blobs(2);
874        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
875        store.insert_all(blobs).unwrap();
876
877        // Try to get a blob that was never inserted
878        let missing_tx = TxHash::random();
879        let result = store.get_exact(vec![txs[0], missing_tx]);
880        assert!(result.is_err());
881    }
882
883    #[test]
884    fn disk_data_size_hint() {
885        let (store, _dir) = tmp_store();
886        assert_eq!(store.data_size_hint(), Some(0));
887
888        let blobs = rng_blobs(2);
889        store.insert_all(blobs).unwrap();
890        assert!(store.data_size_hint().unwrap() > 0);
891    }
892
893    #[test]
894    fn disk_cleanup_stat() {
895        let (store, _dir) = tmp_store();
896
897        let blobs = rng_blobs(3);
898        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
899        store.insert_all(blobs).unwrap();
900
901        store.delete_all(txs).unwrap();
902        let stat = store.cleanup();
903        assert_eq!(stat.delete_succeed, 3);
904        assert_eq!(stat.delete_failed, 0);
905    }
906
907    #[test]
908    fn disk_get_blobs_v3_returns_partial_results() {
909        let (store, _dir) = tmp_store();
910
911        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
912        store.insert(TxHash::random(), sidecar).unwrap();
913
914        assert_ne!(versioned_hash, B256::ZERO);
915
916        let request = vec![versioned_hash, B256::ZERO];
917        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
918        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
919
920        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
921        assert_eq!(v3, vec![Some(expected), None]);
922    }
923
924    #[test]
925    fn disk_get_blobs_v3_can_fallback_to_disk() {
926        let (store, _dir) = tmp_store();
927
928        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
929        store.insert(TxHash::random(), sidecar).unwrap();
930        store.clear_cache();
931
932        let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
933        assert_eq!(v3, vec![Some(expected)]);
934    }
935}