Skip to main content

reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2},
6    eip7594::BlobTransactionSidecarVariant,
7    eip7840::BlobParams,
8    merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
17pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19/// A cache size heuristic based on the highest blob params
20///
21/// This uses the max blobs per tx and max blobs per block over 16 epochs: `21 * 6 * 512 = 64512`
22/// This should be ~4MB
23const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24    BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26/// A blob store that stores blob data on disk.
27///
28/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
29/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
30/// blobs from disk.
31#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33    inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37    /// Opens and initializes a new disk file blob store according to the given options.
38    pub fn open(
39        blob_dir: impl Into<PathBuf>,
40        opts: DiskFileBlobStoreConfig,
41    ) -> Result<Self, DiskFileBlobStoreError> {
42        let blob_dir = blob_dir.into();
43        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46        // initialize the blob store
47        inner.delete_all()?;
48        inner.create_blob_dir()?;
49
50        Ok(Self { inner: Arc::new(inner) })
51    }
52
53    #[cfg(test)]
54    fn is_cached(&self, tx: &B256) -> bool {
55        self.inner.blob_cache.lock().get(tx).is_some()
56    }
57
58    #[cfg(test)]
59    fn clear_cache(&self) {
60        self.inner.blob_cache.lock().clear()
61    }
62
63    /// Look up EIP-7594 blobs by their versioned hashes.
64    ///
65    /// This returns a result vector with the **same length and order** as the input
66    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
67    /// `None` if it is missing or an older sidecar version.
68    ///
69    /// The lookup first scans the in-memory cache and, if not all blobs are found, falls back to
70    /// reading candidate sidecars from disk using the `versioned_hash -> tx_hash` index.
71    fn get_by_versioned_hashes_eip7594(
72        &self,
73        versioned_hashes: &[B256],
74    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75        // we must return the blobs in order but we don't necessarily find them in the requested
76        // order
77        let mut result = vec![None; versioned_hashes.len()];
78        let mut missing_count = result.len();
79        // first scan all cached full sidecars
80        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82                for (hash_idx, match_result) in
83                    blob_sidecar.match_versioned_hashes(versioned_hashes)
84                {
85                    let slot = &mut result[hash_idx];
86                    if slot.is_none() {
87                        missing_count -= 1;
88                    }
89                    *slot = Some(match_result);
90                }
91            }
92
93            // return early if all blobs are found.
94            if missing_count == 0 {
95                // since versioned_hashes may have duplicates, we double check here
96                if result.iter().all(|blob| blob.is_some()) {
97                    return Ok(result);
98                }
99            }
100        }
101
102        // not all versioned hashes were found, try to look up a matching tx
103        let mut missing_tx_hashes = Vec::new();
104
105        {
106            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107            for (idx, _) in
108                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109            {
110                // this is safe because the result vec has the same len
111                let versioned_hash = versioned_hashes[idx];
112                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113                    missing_tx_hashes.push(tx_hash);
114                }
115            }
116        }
117
118        // if we have missing blobs, try to read them from disk and try again
119        if !missing_tx_hashes.is_empty() {
120            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121            for (_, blob_sidecar) in blobs_from_disk {
122                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123                    for (hash_idx, match_result) in
124                        blob_sidecar.match_versioned_hashes(versioned_hashes)
125                    {
126                        if result[hash_idx].is_none() {
127                            result[hash_idx] = Some(match_result);
128                        }
129                    }
130                }
131            }
132        }
133
134        Ok(result)
135    }
136}
137
138impl BlobStore for DiskFileBlobStore {
139    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
140        self.inner.insert_one(tx, data)
141    }
142
143    fn insert_all(
144        &self,
145        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
146    ) -> Result<(), BlobStoreError> {
147        if txs.is_empty() {
148            return Ok(())
149        }
150        self.inner.insert_many(txs)
151    }
152
153    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
154        if self.inner.contains(tx)? {
155            self.inner.txs_to_delete.write().insert(tx);
156        }
157        Ok(())
158    }
159
160    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
161        if txs.is_empty() {
162            return Ok(())
163        }
164        let txs = self.inner.retain_existing(txs)?;
165        self.inner.txs_to_delete.write().extend(txs);
166        Ok(())
167    }
168
169    fn cleanup(&self) -> BlobStoreCleanupStat {
170        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
171        let mut stat = BlobStoreCleanupStat::default();
172        let mut subsize = 0;
173        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
174        for tx in txs_to_delete {
175            let path = self.inner.blob_disk_file(tx);
176            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
177            match fs::remove_file(&path) {
178                Ok(_) => {
179                    stat.delete_succeed += 1;
180                    subsize += filesize;
181                }
182                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
183                    // Already deleted by a concurrent cleanup task
184                    stat.delete_succeed += 1;
185                }
186                Err(e) => {
187                    stat.delete_failed += 1;
188                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
189                    debug!(target:"txpool::blob", %err);
190                }
191            };
192        }
193        self.inner.size_tracker.sub_size(subsize as usize);
194        self.inner.size_tracker.sub_len(stat.delete_succeed);
195        stat
196    }
197
198    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
199        self.inner.get_one(tx)
200    }
201
202    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
203        self.inner.contains(tx)
204    }
205
206    fn get_all(
207        &self,
208        txs: Vec<B256>,
209    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
210        if txs.is_empty() {
211            return Ok(Vec::new())
212        }
213        self.inner.get_all(txs)
214    }
215
216    fn get_exact(
217        &self,
218        txs: Vec<B256>,
219    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
220        if txs.is_empty() {
221            return Ok(Vec::new())
222        }
223        self.inner.get_exact(txs)
224    }
225
226    fn get_by_versioned_hashes_v1(
227        &self,
228        versioned_hashes: &[B256],
229    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
230        // the response must always be the same len as the request, misses must be None
231        let mut result = vec![None; versioned_hashes.len()];
232
233        // first scan all cached full sidecars
234        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
235            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
236                for (hash_idx, match_result) in
237                    blob_sidecar.match_versioned_hashes(versioned_hashes)
238                {
239                    result[hash_idx] = Some(match_result);
240                }
241            }
242
243            // return early if all blobs are found.
244            if result.iter().all(|blob| blob.is_some()) {
245                return Ok(result);
246            }
247        }
248
249        // not all versioned hashes were be found, try to look up a matching tx
250
251        let mut missing_tx_hashes = Vec::new();
252
253        {
254            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
255            for (idx, _) in
256                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
257            {
258                // this is safe because the result vec has the same len
259                let versioned_hash = versioned_hashes[idx];
260                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
261                    missing_tx_hashes.push(tx_hash);
262                }
263            }
264        }
265
266        // if we have missing blobs, try to read them from disk and try again
267        if !missing_tx_hashes.is_empty() {
268            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
269            for (_, blob_sidecar) in blobs_from_disk {
270                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
271                    for (hash_idx, match_result) in
272                        blob_sidecar.match_versioned_hashes(versioned_hashes)
273                    {
274                        if result[hash_idx].is_none() {
275                            result[hash_idx] = Some(match_result);
276                        }
277                    }
278                }
279            }
280        }
281
282        Ok(result)
283    }
284
285    fn get_by_versioned_hashes_v2(
286        &self,
287        versioned_hashes: &[B256],
288    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
289        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
290
291        // only return the blobs if we found all requested versioned hashes
292        if result.iter().all(|blob| blob.is_some()) {
293            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
294        } else {
295            Ok(None)
296        }
297    }
298
299    fn get_by_versioned_hashes_v3(
300        &self,
301        versioned_hashes: &[B256],
302    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
303        self.get_by_versioned_hashes_eip7594(versioned_hashes)
304    }
305
306    fn data_size_hint(&self) -> Option<usize> {
307        Some(self.inner.size_tracker.data_size())
308    }
309
310    fn blobs_len(&self) -> usize {
311        self.inner.size_tracker.blobs_len()
312    }
313}
314
315struct DiskFileBlobStoreInner {
316    blob_dir: PathBuf,
317    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
318    size_tracker: BlobStoreSize,
319    file_lock: RwLock<()>,
320    txs_to_delete: RwLock<B256Set>,
321    /// Tracks of known versioned hashes and a transaction they exist in
322    ///
323    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
324    /// the most recent one.
325    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
326}
327
328impl DiskFileBlobStoreInner {
329    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
330    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
331        Self {
332            blob_dir,
333            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
334            size_tracker: Default::default(),
335            file_lock: Default::default(),
336            txs_to_delete: Default::default(),
337            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
338                VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
339            ))),
340        }
341    }
342
343    /// Creates the directory where blobs will be stored on disk.
344    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
345        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
346        fs::create_dir_all(&self.blob_dir)
347            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
348    }
349
350    /// Deletes the entire blob store.
351    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
352        match fs::remove_dir_all(&self.blob_dir) {
353            Ok(_) => {
354                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
355            }
356            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
357            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
358        }
359        Ok(())
360    }
361
362    /// Ensures blob is in the blob cache and written to the disk.
363    fn insert_one(
364        &self,
365        tx: B256,
366        data: BlobTransactionSidecarVariant,
367    ) -> Result<(), BlobStoreError> {
368        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
369        data.rlp_encode_fields(&mut buf);
370
371        {
372            // cache the versioned hashes to tx hash
373            let mut map = self.versioned_hashes_to_txhash.lock();
374            data.versioned_hashes().for_each(|hash| {
375                map.insert(hash, tx);
376            });
377        }
378
379        self.blob_cache.lock().insert(tx, Arc::new(data));
380
381        let size = self.write_one_encoded(tx, &buf)?;
382
383        self.size_tracker.add_size(size);
384        self.size_tracker.inc_len(1);
385        Ok(())
386    }
387
388    /// Ensures blobs are in the blob cache and written to the disk.
389    fn insert_many(
390        &self,
391        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
392    ) -> Result<(), BlobStoreError> {
393        let raw = txs
394            .iter()
395            .map(|(tx, data)| {
396                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
397                data.rlp_encode_fields(&mut buf);
398                (self.blob_disk_file(*tx), buf)
399            })
400            .collect::<Vec<_>>();
401
402        {
403            // cache versioned hashes to tx hash
404            let mut map = self.versioned_hashes_to_txhash.lock();
405            for (tx, data) in &txs {
406                data.versioned_hashes().for_each(|hash| {
407                    map.insert(hash, *tx);
408                });
409            }
410        }
411
412        {
413            // cache blobs
414            let mut cache = self.blob_cache.lock();
415            for (tx, data) in txs {
416                cache.insert(tx, Arc::new(data));
417            }
418        }
419
420        let mut add = 0;
421        let mut num = 0;
422        {
423            let _lock = self.file_lock.write();
424            for (path, data) in raw {
425                if path.exists() {
426                    debug!(target:"txpool::blob", ?path, "Blob already exists");
427                } else if let Err(err) = fs::write(&path, &data) {
428                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
429                } else {
430                    add += data.len();
431                    num += 1;
432                }
433            }
434        }
435        self.size_tracker.add_size(add);
436        self.size_tracker.inc_len(num);
437
438        Ok(())
439    }
440
441    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
442    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
443        if self.blob_cache.lock().get(&tx).is_some() {
444            return Ok(true)
445        }
446        // we only check if the file exists and assume it's valid
447        Ok(self.blob_disk_file(tx).is_file())
448    }
449
450    /// Returns all the blob transactions which are in the cache or on the disk.
451    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
452        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
453            let mut cache = self.blob_cache.lock();
454            txs.into_iter().partition(|tx| cache.get(tx).is_some())
455        };
456
457        let mut existing = in_cache;
458        for tx in not_in_cache {
459            if self.blob_disk_file(tx).is_file() {
460                existing.push(tx);
461            }
462        }
463
464        Ok(existing)
465    }
466
467    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
468    fn get_one(
469        &self,
470        tx: B256,
471    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
472        if let Some(blob) = self.blob_cache.lock().get(&tx) {
473            return Ok(Some(blob.clone()))
474        }
475
476        if let Some(blob) = self.read_one(tx)? {
477            let blob_arc = Arc::new(blob);
478            self.blob_cache.lock().insert(tx, blob_arc.clone());
479            return Ok(Some(blob_arc))
480        }
481
482        Ok(None)
483    }
484
485    /// Returns the path to the blob file for the given transaction hash.
486    #[inline]
487    fn blob_disk_file(&self, tx: B256) -> PathBuf {
488        self.blob_dir.join(format!("{tx:x}"))
489    }
490
491    /// Retrieves the blob data for the given transaction hash.
492    #[inline]
493    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
494        let path = self.blob_disk_file(tx);
495        let data = {
496            let _lock = self.file_lock.read();
497            match fs::read(&path) {
498                Ok(data) => data,
499                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
500                Err(e) => {
501                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
502                        tx, path, e,
503                    ))))
504                }
505            }
506        };
507        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
508            .map(Some)
509            .map_err(BlobStoreError::DecodeError)
510    }
511
512    /// Returns decoded blobs read from disk.
513    ///
514    /// Only returns sidecars that were found and successfully decoded.
515    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
516        self.read_many_raw(txs)
517            .into_iter()
518            .filter_map(|(tx, data)| {
519                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
520                    .map(|sidecar| (tx, sidecar))
521                    .ok()
522            })
523            .collect()
524    }
525
526    /// Retrieves the raw blob data for the given transaction hashes.
527    ///
528    /// Only returns the blobs that were found in file.
529    #[inline]
530    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
531        let mut res = Vec::with_capacity(txs.len());
532        let _lock = self.file_lock.read();
533        for tx in txs {
534            let path = self.blob_disk_file(tx);
535            match fs::read(&path) {
536                Ok(data) => {
537                    res.push((tx, data));
538                }
539                Err(err) => {
540                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
541                }
542            };
543        }
544        res
545    }
546
547    /// Writes the blob data for the given transaction hash to the disk.
548    #[inline]
549    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
550        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
551        let mut add = 0;
552        let path = self.blob_disk_file(tx);
553        {
554            let _lock = self.file_lock.write();
555            if !path.exists() {
556                fs::write(&path, data)
557                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
558                add = data.len();
559            }
560        }
561        Ok(add)
562    }
563
564    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
565    ///
566    /// This will not return an error if there are missing blobs. Therefore, the result may be a
567    /// subset of the request or an empty vector if none of the blobs were found.
568    #[inline]
569    fn get_all(
570        &self,
571        txs: Vec<B256>,
572    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
573        let mut res = Vec::with_capacity(txs.len());
574        let mut cache_miss = Vec::new();
575        {
576            let mut cache = self.blob_cache.lock();
577            for tx in txs {
578                if let Some(blob) = cache.get(&tx) {
579                    res.push((tx, blob.clone()));
580                } else {
581                    cache_miss.push(tx)
582                }
583            }
584        }
585        if cache_miss.is_empty() {
586            return Ok(res)
587        }
588        let from_disk = self.read_many_decoded(cache_miss);
589        if from_disk.is_empty() {
590            return Ok(res)
591        }
592        let from_disk = from_disk
593            .into_iter()
594            .map(|(tx, data)| {
595                let data = Arc::new(data);
596                res.push((tx, data.clone()));
597                (tx, data)
598            })
599            .collect::<Vec<_>>();
600
601        let mut cache = self.blob_cache.lock();
602        for (tx, data) in from_disk {
603            cache.insert(tx, data);
604        }
605
606        Ok(res)
607    }
608
609    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
610    ///
611    /// Returns an error if there are any missing blobs.
612    #[inline]
613    fn get_exact(
614        &self,
615        txs: Vec<B256>,
616    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
617        txs.into_iter()
618            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
619            .collect()
620    }
621}
622
623impl fmt::Debug for DiskFileBlobStoreInner {
624    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625        f.debug_struct("DiskFileBlobStoreInner")
626            .field("blob_dir", &self.blob_dir)
627            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
628            .field("txs_to_delete", &self.txs_to_delete.try_read())
629            .finish()
630    }
631}
632
633/// Errors that can occur when interacting with a disk file blob store.
634#[derive(Debug, thiserror::Error)]
635pub enum DiskFileBlobStoreError {
636    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
637    #[error("failed to open blobstore at {0}: {1}")]
638    /// Indicates a failure to open the blob store directory.
639    Open(PathBuf, io::Error),
640    /// Failure while reading a blob file.
641    #[error("[{0}] failed to read blob file at {1}: {2}")]
642    /// Indicates a failure while reading a blob file.
643    ReadFile(TxHash, PathBuf, io::Error),
644    /// Failure while writing a blob file.
645    #[error("[{0}] failed to write blob file at {1}: {2}")]
646    /// Indicates a failure while writing a blob file.
647    WriteFile(TxHash, PathBuf, io::Error),
648    /// Failure while deleting a blob file.
649    #[error("[{0}] failed to delete blob file at {1}: {2}")]
650    /// Indicates a failure while deleting a blob file.
651    DeleteFile(TxHash, PathBuf, io::Error),
652}
653
654impl From<DiskFileBlobStoreError> for BlobStoreError {
655    fn from(value: DiskFileBlobStoreError) -> Self {
656        Self::Other(Box::new(value))
657    }
658}
659
660/// Configuration for a disk file blob store.
661#[derive(Debug, Clone)]
662pub struct DiskFileBlobStoreConfig {
663    /// The maximum number of blobs to keep in the in memory blob cache.
664    pub max_cached_entries: u32,
665    /// How to open the blob store.
666    pub open: OpenDiskFileBlobStore,
667}
668
669impl Default for DiskFileBlobStoreConfig {
670    fn default() -> Self {
671        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
672    }
673}
674
675impl DiskFileBlobStoreConfig {
676    /// Set maximum number of blobs to keep in the in memory blob cache.
677    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
678        self.max_cached_entries = max_cached_entries;
679        self
680    }
681}
682
683/// How to open a disk file blob store.
684#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
685pub enum OpenDiskFileBlobStore {
686    /// Clear everything in the blob store.
687    #[default]
688    Clear,
689    /// Keep the existing blob store and index
690    ReIndex,
691}
692
693#[cfg(test)]
694mod tests {
695    use alloy_consensus::BlobTransactionSidecar;
696    use alloy_eips::{
697        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
698        eip7594::{
699            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
700        },
701    };
702
703    use super::*;
704    use std::sync::atomic::Ordering;
705
706    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
707        let dir = tempfile::tempdir().unwrap();
708        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
709        (store, dir)
710    }
711
712    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
713        let mut rng = rand::rng();
714        (0..num)
715            .map(|_| {
716                let tx = TxHash::random_with(&mut rng);
717                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
718                    blobs: vec![],
719                    commitments: vec![],
720                    proofs: vec![],
721                });
722                (tx, blob)
723            })
724            .collect()
725    }
726
727    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
728        let blob = Blob::default();
729        let commitment = Bytes48::default();
730        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
731
732        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
733
734        let expected =
735            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
736        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
737
738        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
739    }
740
741    #[test]
742    fn disk_insert_all_get_all() {
743        let (store, _dir) = tmp_store();
744
745        let blobs = rng_blobs(10);
746        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
747        store.insert_all(blobs.clone()).unwrap();
748
749        // all cached
750        for (tx, blob) in &blobs {
751            assert!(store.is_cached(tx));
752            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
753            assert_eq!(b, *blob);
754        }
755
756        let all = store.get_all(all_hashes.clone()).unwrap();
757        for (tx, blob) in all {
758            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
759        }
760
761        assert!(store.contains(all_hashes[0]).unwrap());
762        store.delete_all(all_hashes.clone()).unwrap();
763        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
764        store.clear_cache();
765        store.cleanup();
766
767        assert!(store.get(blobs[0].0).unwrap().is_none());
768
769        let all = store.get_all(all_hashes.clone()).unwrap();
770        assert!(all.is_empty());
771
772        assert!(!store.contains(all_hashes[0]).unwrap());
773        assert!(store.get_exact(all_hashes).is_err());
774
775        assert_eq!(store.data_size_hint(), Some(0));
776        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
777    }
778
779    #[test]
780    fn disk_insert_and_retrieve() {
781        let (store, _dir) = tmp_store();
782
783        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
784        store.insert(tx, blob.clone()).unwrap();
785
786        assert!(store.is_cached(&tx));
787        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
788        assert_eq!(retrieved_blob, blob);
789    }
790
791    #[test]
792    fn disk_delete_blob() {
793        let (store, _dir) = tmp_store();
794
795        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
796        store.insert(tx, blob).unwrap();
797        assert!(store.is_cached(&tx));
798
799        store.delete(tx).unwrap();
800        assert!(store.inner.txs_to_delete.read().contains(&tx));
801        store.cleanup();
802
803        let result = store.get(tx).unwrap();
804        assert_eq!(
805            result,
806            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
807                blobs: vec![],
808                commitments: vec![],
809                proofs: vec![]
810            })))
811        );
812    }
813
814    #[test]
815    fn disk_insert_all_and_delete_all() {
816        let (store, _dir) = tmp_store();
817
818        let blobs = rng_blobs(5);
819        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
820        store.insert_all(blobs.clone()).unwrap();
821
822        for (tx, _) in &blobs {
823            assert!(store.is_cached(tx));
824        }
825
826        store.delete_all(txs.clone()).unwrap();
827        store.cleanup();
828
829        for tx in txs {
830            let result = store.get(tx).unwrap();
831            assert_eq!(
832                result,
833                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
834                    blobs: vec![],
835                    commitments: vec![],
836                    proofs: vec![]
837                })))
838            );
839        }
840    }
841
842    #[test]
843    fn disk_get_all_blobs() {
844        let (store, _dir) = tmp_store();
845
846        let blobs = rng_blobs(3);
847        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
848        store.insert_all(blobs.clone()).unwrap();
849
850        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
851        for (tx, blob) in retrieved_blobs {
852            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
853        }
854
855        store.delete_all(txs).unwrap();
856        store.cleanup();
857    }
858
859    #[test]
860    fn disk_get_exact_blobs_success() {
861        let (store, _dir) = tmp_store();
862
863        let blobs = rng_blobs(3);
864        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
865        store.insert_all(blobs.clone()).unwrap();
866
867        let retrieved_blobs = store.get_exact(txs).unwrap();
868        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
869            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
870        }
871    }
872
873    #[test]
874    fn disk_get_exact_blobs_failure() {
875        let (store, _dir) = tmp_store();
876
877        let blobs = rng_blobs(2);
878        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
879        store.insert_all(blobs).unwrap();
880
881        // Try to get a blob that was never inserted
882        let missing_tx = TxHash::random();
883        let result = store.get_exact(vec![txs[0], missing_tx]);
884        assert!(result.is_err());
885    }
886
887    #[test]
888    fn disk_data_size_hint() {
889        let (store, _dir) = tmp_store();
890        assert_eq!(store.data_size_hint(), Some(0));
891
892        let blobs = rng_blobs(2);
893        store.insert_all(blobs).unwrap();
894        assert!(store.data_size_hint().unwrap() > 0);
895    }
896
897    #[test]
898    fn disk_cleanup_stat() {
899        let (store, _dir) = tmp_store();
900
901        let blobs = rng_blobs(3);
902        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
903        store.insert_all(blobs).unwrap();
904
905        store.delete_all(txs).unwrap();
906        let stat = store.cleanup();
907        assert_eq!(stat.delete_succeed, 3);
908        assert_eq!(stat.delete_failed, 0);
909    }
910
911    #[test]
912    fn disk_get_blobs_v3_returns_partial_results() {
913        let (store, _dir) = tmp_store();
914
915        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
916        store.insert(TxHash::random(), sidecar).unwrap();
917
918        assert_ne!(versioned_hash, B256::ZERO);
919
920        let request = vec![versioned_hash, B256::ZERO];
921        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
922        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
923
924        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
925        assert_eq!(v3, vec![Some(expected), None]);
926    }
927
928    #[test]
929    fn disk_get_blobs_v3_can_fallback_to_disk() {
930        let (store, _dir) = tmp_store();
931
932        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
933        store.insert(TxHash::random(), sidecar).unwrap();
934        store.clear_cache();
935
936        let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
937        assert_eq!(v3, vec![Some(expected)]);
938    }
939
940    #[test]
941    fn disk_double_cleanup_no_failure() {
942        let (store, _dir) = tmp_store();
943
944        let blobs = rng_blobs(5);
945        let all_hashes: Vec<_> = blobs.iter().map(|(tx, _)| *tx).collect();
946        store.insert_all(blobs).unwrap();
947        store.clear_cache();
948
949        // Schedule blobs for deletion
950        store.delete_all(all_hashes.clone()).unwrap();
951
952        // First cleanup: files exist, all should succeed
953        let stat1 = store.cleanup();
954        assert_eq!(stat1.delete_succeed, 5);
955        assert_eq!(stat1.delete_failed, 0);
956
957        // Manually re-enqueue the same hashes to simulate a concurrent cleanup race
958        store.inner.txs_to_delete.write().extend(all_hashes);
959
960        // Second cleanup: files already deleted, should still report success (NotFound)
961        let stat2 = store.cleanup();
962        assert_eq!(stat2.delete_succeed, 5);
963        assert_eq!(stat2.delete_failed, 0);
964    }
965}