Skip to main content

reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
6    eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
7    eip7840::BlobParams,
8    merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B128, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
17pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19/// A cache size heuristic based on the highest blob params
20///
21/// This uses the max blobs per tx and max blobs per block over 16 epochs: `21 * 6 * 512 = 64512`
22/// This should be ~4MB
23const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24    BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26/// A blob store that stores blob data on disk.
27///
28/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
29/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
30/// blobs from disk.
31#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33    inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37    /// Opens and initializes a new disk file blob store according to the given options.
38    pub fn open(
39        blob_dir: impl Into<PathBuf>,
40        opts: DiskFileBlobStoreConfig,
41    ) -> Result<Self, DiskFileBlobStoreError> {
42        let blob_dir = blob_dir.into();
43        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46        // initialize the blob store
47        inner.delete_all()?;
48        inner.create_blob_dir()?;
49
50        Ok(Self { inner: Arc::new(inner) })
51    }
52
53    #[cfg(test)]
54    fn is_cached(&self, tx: &B256) -> bool {
55        self.inner.blob_cache.lock().get(tx).is_some()
56    }
57
58    #[cfg(test)]
59    fn clear_cache(&self) {
60        self.inner.blob_cache.lock().clear()
61    }
62
63    /// Look up EIP-7594 blobs by their versioned hashes.
64    ///
65    /// This returns a result vector with the **same length and order** as the input
66    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
67    /// `None` if it is missing or an older sidecar version.
68    ///
69    /// The lookup first scans the in-memory cache and, if not all blobs are found, falls back to
70    /// reading candidate sidecars from disk using the `versioned_hash -> tx_hash` index.
71    fn get_by_versioned_hashes_eip7594(
72        &self,
73        versioned_hashes: &[B256],
74    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75        // we must return the blobs in order but we don't necessarily find them in the requested
76        // order
77        let mut result = vec![None; versioned_hashes.len()];
78        let mut missing_count = result.len();
79        // first scan all cached full sidecars
80        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82                for (hash_idx, match_result) in
83                    blob_sidecar.match_versioned_hashes(versioned_hashes)
84                {
85                    let slot = &mut result[hash_idx];
86                    if slot.is_none() {
87                        missing_count -= 1;
88                    }
89                    *slot = Some(match_result);
90                }
91            }
92
93            // return early if all blobs are found.
94            if missing_count == 0 {
95                // since versioned_hashes may have duplicates, we double check here
96                if result.iter().all(|blob| blob.is_some()) {
97                    return Ok(result);
98                }
99            }
100        }
101
102        // not all versioned hashes were found, try to look up a matching tx
103        let mut missing_tx_hashes = Vec::new();
104
105        {
106            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107            for (idx, _) in
108                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109            {
110                // this is safe because the result vec has the same len
111                let versioned_hash = versioned_hashes[idx];
112                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113                    missing_tx_hashes.push(tx_hash);
114                }
115            }
116        }
117
118        // if we have missing blobs, try to read them from disk and try again
119        if !missing_tx_hashes.is_empty() {
120            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121            for (_, blob_sidecar) in blobs_from_disk {
122                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123                    for (hash_idx, match_result) in
124                        blob_sidecar.match_versioned_hashes(versioned_hashes)
125                    {
126                        if result[hash_idx].is_none() {
127                            result[hash_idx] = Some(match_result);
128                        }
129                    }
130                }
131            }
132        }
133
134        Ok(result)
135    }
136
137    /// Look up EIP-7594 blob cells by their versioned hashes.
138    fn get_by_versioned_hashes_cells_eip7594(
139        &self,
140        versioned_hashes: &[B256],
141        indices_bitarray: B128,
142    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
143        let cell_mask = BlobCellMask::new(indices_bitarray);
144        let mut result = vec![None; versioned_hashes.len()];
145        let mut missing_count = result.len();
146
147        let cached_blob_sidecars = self
148            .inner
149            .blob_cache
150            .lock()
151            .iter()
152            .map(|(_, blob_sidecar)| Arc::clone(blob_sidecar))
153            .collect::<Vec<_>>();
154        for blob_sidecar in cached_blob_sidecars {
155            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
156                for (hash_idx, match_result) in blob_sidecar
157                    .match_versioned_hashes_cells(versioned_hashes, cell_mask)
158                    .map_err(|err| BlobStoreError::Other(Box::new(err)))?
159                {
160                    let slot = &mut result[hash_idx];
161                    if slot.is_none() {
162                        missing_count -= 1;
163                    }
164                    *slot = Some(match_result);
165                }
166            }
167
168            if missing_count == 0 && result.iter().all(Option::is_some) {
169                return Ok(result)
170            }
171        }
172
173        let mut missing_tx_hashes = Vec::new();
174        {
175            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
176            for (idx, _) in
177                result.iter().enumerate().filter(|(_, cells_and_proofs)| cells_and_proofs.is_none())
178            {
179                let versioned_hash = versioned_hashes[idx];
180                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
181                    missing_tx_hashes.push(tx_hash);
182                }
183            }
184        }
185
186        if !missing_tx_hashes.is_empty() {
187            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
188            for (_, blob_sidecar) in blobs_from_disk {
189                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
190                    for (hash_idx, match_result) in blob_sidecar
191                        .match_versioned_hashes_cells(versioned_hashes, cell_mask)
192                        .map_err(|err| BlobStoreError::Other(Box::new(err)))?
193                    {
194                        if result[hash_idx].is_none() {
195                            result[hash_idx] = Some(match_result);
196                        }
197                    }
198                }
199            }
200        }
201
202        Ok(result)
203    }
204}
205
206impl BlobStore for DiskFileBlobStore {
207    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
208        self.inner.insert_one(tx, data)
209    }
210
211    fn insert_all(
212        &self,
213        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
214    ) -> Result<(), BlobStoreError> {
215        if txs.is_empty() {
216            return Ok(())
217        }
218        self.inner.insert_many(txs)
219    }
220
221    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
222        if self.inner.contains(tx)? {
223            self.inner.txs_to_delete.write().insert(tx);
224        }
225        Ok(())
226    }
227
228    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
229        if txs.is_empty() {
230            return Ok(())
231        }
232        let txs = self.inner.retain_existing(txs)?;
233        self.inner.txs_to_delete.write().extend(txs);
234        Ok(())
235    }
236
237    fn cleanup(&self) -> BlobStoreCleanupStat {
238        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
239        let mut stat = BlobStoreCleanupStat::default();
240        let mut subsize = 0;
241        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
242        for tx in txs_to_delete {
243            let path = self.inner.blob_disk_file(tx);
244            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
245            match fs::remove_file(&path) {
246                Ok(_) => {
247                    stat.delete_succeed += 1;
248                    subsize += filesize;
249                }
250                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
251                    // Already deleted by a concurrent cleanup task
252                    stat.delete_succeed += 1;
253                }
254                Err(e) => {
255                    stat.delete_failed += 1;
256                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
257                    debug!(target:"txpool::blob", %err);
258                }
259            };
260        }
261        self.inner.size_tracker.sub_size(subsize as usize);
262        self.inner.size_tracker.sub_len(stat.delete_succeed);
263        stat
264    }
265
266    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
267        self.inner.get_one(tx)
268    }
269
270    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
271        self.inner.contains(tx)
272    }
273
274    fn get_all(
275        &self,
276        txs: Vec<B256>,
277    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
278        if txs.is_empty() {
279            return Ok(Vec::new())
280        }
281        self.inner.get_all(txs)
282    }
283
284    fn get_exact(
285        &self,
286        txs: Vec<B256>,
287    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
288        if txs.is_empty() {
289            return Ok(Vec::new())
290        }
291        self.inner.get_exact(txs)
292    }
293
294    fn get_by_versioned_hashes_v1(
295        &self,
296        versioned_hashes: &[B256],
297    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
298        // the response must always be the same len as the request, misses must be None
299        let mut result = vec![None; versioned_hashes.len()];
300
301        // first scan all cached full sidecars
302        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
303            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
304                for (hash_idx, match_result) in
305                    blob_sidecar.match_versioned_hashes(versioned_hashes)
306                {
307                    result[hash_idx] = Some(match_result);
308                }
309            }
310
311            // return early if all blobs are found.
312            if result.iter().all(|blob| blob.is_some()) {
313                return Ok(result);
314            }
315        }
316
317        // not all versioned hashes were be found, try to look up a matching tx
318
319        let mut missing_tx_hashes = Vec::new();
320
321        {
322            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
323            for (idx, _) in
324                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
325            {
326                // this is safe because the result vec has the same len
327                let versioned_hash = versioned_hashes[idx];
328                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
329                    missing_tx_hashes.push(tx_hash);
330                }
331            }
332        }
333
334        // if we have missing blobs, try to read them from disk and try again
335        if !missing_tx_hashes.is_empty() {
336            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
337            for (_, blob_sidecar) in blobs_from_disk {
338                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
339                    for (hash_idx, match_result) in
340                        blob_sidecar.match_versioned_hashes(versioned_hashes)
341                    {
342                        if result[hash_idx].is_none() {
343                            result[hash_idx] = Some(match_result);
344                        }
345                    }
346                }
347            }
348        }
349
350        Ok(result)
351    }
352
353    fn get_by_versioned_hashes_v2(
354        &self,
355        versioned_hashes: &[B256],
356    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
357        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
358
359        // only return the blobs if we found all requested versioned hashes
360        if result.iter().all(|blob| blob.is_some()) {
361            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
362        } else {
363            Ok(None)
364        }
365    }
366
367    fn get_by_versioned_hashes_v3(
368        &self,
369        versioned_hashes: &[B256],
370    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
371        self.get_by_versioned_hashes_eip7594(versioned_hashes)
372    }
373
374    fn get_by_versioned_hashes_v4(
375        &self,
376        versioned_hashes: &[B256],
377        indices_bitarray: B128,
378    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
379        self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
380    }
381
382    fn get_cells(
383        &self,
384        tx: B256,
385        indices_bitarray: B128,
386    ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
387        let Some(sidecar) = self.get(tx)? else {
388            return Ok(None);
389        };
390
391        let Some(sidecar) = sidecar.as_eip7594() else {
392            return Ok(None);
393        };
394
395        sidecar
396            .compute_matching_cells(BlobCellMask::new(indices_bitarray))
397            .map(Some)
398            .map_err(|err| BlobStoreError::Other(Box::new(err)))
399    }
400
401    fn data_size_hint(&self) -> Option<usize> {
402        Some(self.inner.size_tracker.data_size())
403    }
404
405    fn blobs_len(&self) -> usize {
406        self.inner.size_tracker.blobs_len()
407    }
408}
409
410struct DiskFileBlobStoreInner {
411    blob_dir: PathBuf,
412    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
413    size_tracker: BlobStoreSize,
414    file_lock: RwLock<()>,
415    txs_to_delete: RwLock<B256Set>,
416    /// Tracks of known versioned hashes and a transaction they exist in
417    ///
418    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
419    /// the most recent one.
420    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
421}
422
423impl DiskFileBlobStoreInner {
424    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
425    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
426        Self {
427            blob_dir,
428            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
429            size_tracker: Default::default(),
430            file_lock: Default::default(),
431            txs_to_delete: Default::default(),
432            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
433                VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
434            ))),
435        }
436    }
437
438    /// Creates the directory where blobs will be stored on disk.
439    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
440        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
441        fs::create_dir_all(&self.blob_dir)
442            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
443    }
444
445    /// Deletes the entire blob store.
446    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
447        match fs::remove_dir_all(&self.blob_dir) {
448            Ok(_) => {
449                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
450            }
451            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
452            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
453        }
454        Ok(())
455    }
456
457    /// Ensures blob is in the blob cache and written to the disk.
458    fn insert_one(
459        &self,
460        tx: B256,
461        data: BlobTransactionSidecarVariant,
462    ) -> Result<(), BlobStoreError> {
463        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
464        data.rlp_encode_fields(&mut buf);
465
466        {
467            // cache the versioned hashes to tx hash
468            let mut map = self.versioned_hashes_to_txhash.lock();
469            data.versioned_hashes().for_each(|hash| {
470                map.insert(hash, tx);
471            });
472        }
473
474        self.blob_cache.lock().insert(tx, Arc::new(data));
475
476        let size = self.write_one_encoded(tx, &buf)?;
477
478        self.size_tracker.add_size(size);
479        self.size_tracker.inc_len(1);
480        Ok(())
481    }
482
483    /// Ensures blobs are in the blob cache and written to the disk.
484    fn insert_many(
485        &self,
486        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
487    ) -> Result<(), BlobStoreError> {
488        let raw = txs
489            .iter()
490            .map(|(tx, data)| {
491                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
492                data.rlp_encode_fields(&mut buf);
493                (self.blob_disk_file(*tx), buf)
494            })
495            .collect::<Vec<_>>();
496
497        {
498            // cache versioned hashes to tx hash
499            let mut map = self.versioned_hashes_to_txhash.lock();
500            for (tx, data) in &txs {
501                data.versioned_hashes().for_each(|hash| {
502                    map.insert(hash, *tx);
503                });
504            }
505        }
506
507        {
508            // cache blobs
509            let mut cache = self.blob_cache.lock();
510            for (tx, data) in txs {
511                cache.insert(tx, Arc::new(data));
512            }
513        }
514
515        let mut add = 0;
516        let mut num = 0;
517        {
518            let _lock = self.file_lock.write();
519            for (path, data) in raw {
520                if path.exists() {
521                    debug!(target:"txpool::blob", ?path, "Blob already exists");
522                } else if let Err(err) = fs::write(&path, &data) {
523                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
524                } else {
525                    add += data.len();
526                    num += 1;
527                }
528            }
529        }
530        self.size_tracker.add_size(add);
531        self.size_tracker.inc_len(num);
532
533        Ok(())
534    }
535
536    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
537    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
538        if self.blob_cache.lock().get(&tx).is_some() {
539            return Ok(true)
540        }
541        // we only check if the file exists and assume it's valid
542        Ok(self.blob_disk_file(tx).is_file())
543    }
544
545    /// Returns all the blob transactions which are in the cache or on the disk.
546    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
547        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
548            let mut cache = self.blob_cache.lock();
549            txs.into_iter().partition(|tx| cache.get(tx).is_some())
550        };
551
552        let mut existing = in_cache;
553        for tx in not_in_cache {
554            if self.blob_disk_file(tx).is_file() {
555                existing.push(tx);
556            }
557        }
558
559        Ok(existing)
560    }
561
562    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
563    fn get_one(
564        &self,
565        tx: B256,
566    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
567        if let Some(blob) = self.blob_cache.lock().get(&tx) {
568            return Ok(Some(blob.clone()))
569        }
570
571        if let Some(blob) = self.read_one(tx)? {
572            let blob_arc = Arc::new(blob);
573            self.blob_cache.lock().insert(tx, blob_arc.clone());
574            return Ok(Some(blob_arc))
575        }
576
577        Ok(None)
578    }
579
580    /// Returns the path to the blob file for the given transaction hash.
581    #[inline]
582    fn blob_disk_file(&self, tx: B256) -> PathBuf {
583        self.blob_dir.join(format!("{tx:x}"))
584    }
585
586    /// Retrieves the blob data for the given transaction hash.
587    #[inline]
588    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
589        let path = self.blob_disk_file(tx);
590        let data = {
591            let _lock = self.file_lock.read();
592            match fs::read(&path) {
593                Ok(data) => data,
594                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
595                Err(e) => {
596                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
597                        tx, path, e,
598                    ))))
599                }
600            }
601        };
602        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
603            .map(Some)
604            .map_err(BlobStoreError::DecodeError)
605    }
606
607    /// Returns decoded blobs read from disk.
608    ///
609    /// Only returns sidecars that were found and successfully decoded.
610    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
611        self.read_many_raw(txs)
612            .into_iter()
613            .filter_map(|(tx, data)| {
614                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
615                    .map(|sidecar| (tx, sidecar))
616                    .ok()
617            })
618            .collect()
619    }
620
621    /// Retrieves the raw blob data for the given transaction hashes.
622    ///
623    /// Only returns the blobs that were found in file.
624    #[inline]
625    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
626        let mut res = Vec::with_capacity(txs.len());
627        let _lock = self.file_lock.read();
628        for tx in txs {
629            let path = self.blob_disk_file(tx);
630            match fs::read(&path) {
631                Ok(data) => {
632                    res.push((tx, data));
633                }
634                Err(err) => {
635                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
636                }
637            };
638        }
639        res
640    }
641
642    /// Writes the blob data for the given transaction hash to the disk.
643    #[inline]
644    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
645        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
646        let mut add = 0;
647        let path = self.blob_disk_file(tx);
648        {
649            let _lock = self.file_lock.write();
650            if !path.exists() {
651                fs::write(&path, data)
652                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
653                add = data.len();
654            }
655        }
656        Ok(add)
657    }
658
659    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
660    ///
661    /// This will not return an error if there are missing blobs. Therefore, the result may be a
662    /// subset of the request or an empty vector if none of the blobs were found.
663    #[inline]
664    fn get_all(
665        &self,
666        txs: Vec<B256>,
667    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
668        let mut res = Vec::with_capacity(txs.len());
669        let mut cache_miss = Vec::new();
670        {
671            let mut cache = self.blob_cache.lock();
672            for tx in txs {
673                if let Some(blob) = cache.get(&tx) {
674                    res.push((tx, blob.clone()));
675                } else {
676                    cache_miss.push(tx)
677                }
678            }
679        }
680        if cache_miss.is_empty() {
681            return Ok(res)
682        }
683        let from_disk = self.read_many_decoded(cache_miss);
684        if from_disk.is_empty() {
685            return Ok(res)
686        }
687        let from_disk = from_disk
688            .into_iter()
689            .map(|(tx, data)| {
690                let data = Arc::new(data);
691                res.push((tx, data.clone()));
692                (tx, data)
693            })
694            .collect::<Vec<_>>();
695
696        let mut cache = self.blob_cache.lock();
697        for (tx, data) in from_disk {
698            cache.insert(tx, data);
699        }
700
701        Ok(res)
702    }
703
704    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
705    ///
706    /// Returns an error if there are any missing blobs.
707    #[inline]
708    fn get_exact(
709        &self,
710        txs: Vec<B256>,
711    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
712        txs.into_iter()
713            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
714            .collect()
715    }
716}
717
718impl fmt::Debug for DiskFileBlobStoreInner {
719    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720        f.debug_struct("DiskFileBlobStoreInner")
721            .field("blob_dir", &self.blob_dir)
722            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
723            .field("txs_to_delete", &self.txs_to_delete.try_read())
724            .finish()
725    }
726}
727
728/// Errors that can occur when interacting with a disk file blob store.
729#[derive(Debug, thiserror::Error)]
730pub enum DiskFileBlobStoreError {
731    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
732    #[error("failed to open blobstore at {0}: {1}")]
733    /// Indicates a failure to open the blob store directory.
734    Open(PathBuf, io::Error),
735    /// Failure while reading a blob file.
736    #[error("[{0}] failed to read blob file at {1}: {2}")]
737    /// Indicates a failure while reading a blob file.
738    ReadFile(TxHash, PathBuf, io::Error),
739    /// Failure while writing a blob file.
740    #[error("[{0}] failed to write blob file at {1}: {2}")]
741    /// Indicates a failure while writing a blob file.
742    WriteFile(TxHash, PathBuf, io::Error),
743    /// Failure while deleting a blob file.
744    #[error("[{0}] failed to delete blob file at {1}: {2}")]
745    /// Indicates a failure while deleting a blob file.
746    DeleteFile(TxHash, PathBuf, io::Error),
747}
748
749impl From<DiskFileBlobStoreError> for BlobStoreError {
750    fn from(value: DiskFileBlobStoreError) -> Self {
751        Self::Other(Box::new(value))
752    }
753}
754
755/// Configuration for a disk file blob store.
756#[derive(Debug, Clone)]
757pub struct DiskFileBlobStoreConfig {
758    /// The maximum number of blobs to keep in the in memory blob cache.
759    pub max_cached_entries: u32,
760    /// How to open the blob store.
761    pub open: OpenDiskFileBlobStore,
762}
763
764impl Default for DiskFileBlobStoreConfig {
765    fn default() -> Self {
766        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
767    }
768}
769
770impl DiskFileBlobStoreConfig {
771    /// Set maximum number of blobs to keep in the in memory blob cache.
772    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
773        self.max_cached_entries = max_cached_entries;
774        self
775    }
776}
777
778/// How to open a disk file blob store.
779#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
780pub enum OpenDiskFileBlobStore {
781    /// Clear everything in the blob store.
782    #[default]
783    Clear,
784    /// Keep the existing blob store and index
785    ReIndex,
786}
787
788#[cfg(test)]
789mod tests {
790    use alloy_consensus::BlobTransactionSidecar;
791    use alloy_eips::{
792        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
793        eip7594::{
794            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
795        },
796    };
797
798    use super::*;
799    use std::sync::atomic::Ordering;
800
801    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
802        let dir = tempfile::tempdir().unwrap();
803        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
804        (store, dir)
805    }
806
807    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
808        let mut rng = rand::rng();
809        (0..num)
810            .map(|_| {
811                let tx = TxHash::random_with(&mut rng);
812                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
813                    blobs: vec![],
814                    commitments: vec![],
815                    proofs: vec![],
816                });
817                (tx, blob)
818            })
819            .collect()
820    }
821
822    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
823        let blob = Blob::default();
824        let commitment = Bytes48::default();
825        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
826
827        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
828
829        let expected =
830            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
831        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
832
833        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
834    }
835
836    #[test]
837    fn disk_insert_all_get_all() {
838        let (store, _dir) = tmp_store();
839
840        let blobs = rng_blobs(10);
841        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
842        store.insert_all(blobs.clone()).unwrap();
843
844        // all cached
845        for (tx, blob) in &blobs {
846            assert!(store.is_cached(tx));
847            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
848            assert_eq!(b, *blob);
849        }
850
851        let all = store.get_all(all_hashes.clone()).unwrap();
852        for (tx, blob) in all {
853            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
854        }
855
856        assert!(store.contains(all_hashes[0]).unwrap());
857        store.delete_all(all_hashes.clone()).unwrap();
858        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
859        store.clear_cache();
860        store.cleanup();
861
862        assert!(store.get(blobs[0].0).unwrap().is_none());
863
864        let all = store.get_all(all_hashes.clone()).unwrap();
865        assert!(all.is_empty());
866
867        assert!(!store.contains(all_hashes[0]).unwrap());
868        assert!(store.get_exact(all_hashes).is_err());
869
870        assert_eq!(store.data_size_hint(), Some(0));
871        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
872    }
873
874    #[test]
875    fn disk_insert_and_retrieve() {
876        let (store, _dir) = tmp_store();
877
878        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
879        store.insert(tx, blob.clone()).unwrap();
880
881        assert!(store.is_cached(&tx));
882        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
883        assert_eq!(retrieved_blob, blob);
884    }
885
886    #[test]
887    fn disk_delete_blob() {
888        let (store, _dir) = tmp_store();
889
890        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
891        store.insert(tx, blob).unwrap();
892        assert!(store.is_cached(&tx));
893
894        store.delete(tx).unwrap();
895        assert!(store.inner.txs_to_delete.read().contains(&tx));
896        store.cleanup();
897
898        let result = store.get(tx).unwrap();
899        assert_eq!(
900            result,
901            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
902                blobs: vec![],
903                commitments: vec![],
904                proofs: vec![]
905            })))
906        );
907    }
908
909    #[test]
910    fn disk_insert_all_and_delete_all() {
911        let (store, _dir) = tmp_store();
912
913        let blobs = rng_blobs(5);
914        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
915        store.insert_all(blobs.clone()).unwrap();
916
917        for (tx, _) in &blobs {
918            assert!(store.is_cached(tx));
919        }
920
921        store.delete_all(txs.clone()).unwrap();
922        store.cleanup();
923
924        for tx in txs {
925            let result = store.get(tx).unwrap();
926            assert_eq!(
927                result,
928                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
929                    blobs: vec![],
930                    commitments: vec![],
931                    proofs: vec![]
932                })))
933            );
934        }
935    }
936
937    #[test]
938    fn disk_get_all_blobs() {
939        let (store, _dir) = tmp_store();
940
941        let blobs = rng_blobs(3);
942        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
943        store.insert_all(blobs.clone()).unwrap();
944
945        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
946        for (tx, blob) in retrieved_blobs {
947            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
948        }
949
950        store.delete_all(txs).unwrap();
951        store.cleanup();
952    }
953
954    #[test]
955    fn disk_get_exact_blobs_success() {
956        let (store, _dir) = tmp_store();
957
958        let blobs = rng_blobs(3);
959        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
960        store.insert_all(blobs.clone()).unwrap();
961
962        let retrieved_blobs = store.get_exact(txs).unwrap();
963        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
964            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
965        }
966    }
967
968    #[test]
969    fn disk_get_exact_blobs_failure() {
970        let (store, _dir) = tmp_store();
971
972        let blobs = rng_blobs(2);
973        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
974        store.insert_all(blobs).unwrap();
975
976        // Try to get a blob that was never inserted
977        let missing_tx = TxHash::random();
978        let result = store.get_exact(vec![txs[0], missing_tx]);
979        assert!(result.is_err());
980    }
981
982    #[test]
983    fn disk_data_size_hint() {
984        let (store, _dir) = tmp_store();
985        assert_eq!(store.data_size_hint(), Some(0));
986
987        let blobs = rng_blobs(2);
988        store.insert_all(blobs).unwrap();
989        assert!(store.data_size_hint().unwrap() > 0);
990    }
991
992    #[test]
993    fn disk_cleanup_stat() {
994        let (store, _dir) = tmp_store();
995
996        let blobs = rng_blobs(3);
997        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
998        store.insert_all(blobs).unwrap();
999
1000        store.delete_all(txs).unwrap();
1001        let stat = store.cleanup();
1002        assert_eq!(stat.delete_succeed, 3);
1003        assert_eq!(stat.delete_failed, 0);
1004    }
1005
1006    #[test]
1007    fn disk_get_blobs_v3_returns_partial_results() {
1008        let (store, _dir) = tmp_store();
1009
1010        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1011        store.insert(TxHash::random(), sidecar).unwrap();
1012
1013        assert_ne!(versioned_hash, B256::ZERO);
1014
1015        let request = vec![versioned_hash, B256::ZERO];
1016        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
1017        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
1018
1019        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
1020        assert_eq!(v3, vec![Some(expected), None]);
1021    }
1022
1023    #[test]
1024    fn disk_get_blobs_v4_returns_requested_cells() {
1025        let (store, _dir) = tmp_store();
1026
1027        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1028        store.insert(TxHash::random(), sidecar).unwrap();
1029
1030        let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
1031        let request = vec![versioned_hash, B256::ZERO];
1032
1033        let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
1034        assert_eq!(v4.len(), request.len());
1035        assert!(v4[1].is_none());
1036
1037        let cells_and_proofs = v4[0].as_ref().unwrap();
1038        assert_eq!(cells_and_proofs.blob_cells.len(), 2);
1039        assert_eq!(cells_and_proofs.proofs.len(), 2);
1040        assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
1041        assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
1042    }
1043
1044    #[test]
1045    fn disk_get_blobs_v3_can_fallback_to_disk() {
1046        let (store, _dir) = tmp_store();
1047
1048        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1049        store.insert(TxHash::random(), sidecar).unwrap();
1050        store.clear_cache();
1051
1052        let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
1053        assert_eq!(v3, vec![Some(expected)]);
1054    }
1055
1056    #[test]
1057    fn disk_get_blobs_v4_can_fallback_to_disk() {
1058        let (store, _dir) = tmp_store();
1059
1060        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1061        store.insert(TxHash::random(), sidecar).unwrap();
1062        store.clear_cache();
1063
1064        let v4 = store.get_by_versioned_hashes_v4(&[versioned_hash], B128::from(1u128)).unwrap();
1065        let cells_and_proofs = v4[0].as_ref().unwrap();
1066        assert_eq!(cells_and_proofs.blob_cells.len(), 1);
1067        assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default())]);
1068    }
1069
1070    #[test]
1071    fn disk_get_cells_can_fallback_to_disk() {
1072        let (store, _dir) = tmp_store();
1073
1074        let tx_hash = TxHash::random();
1075        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1076        store.insert(tx_hash, sidecar).unwrap();
1077
1078        let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
1079        let expected = store
1080            .get_by_versioned_hashes_v4(&[versioned_hash], indices_bitarray)
1081            .unwrap()
1082            .pop()
1083            .unwrap()
1084            .unwrap()
1085            .blob_cells
1086            .into_iter()
1087            .collect::<Option<Vec<_>>>()
1088            .unwrap();
1089
1090        store.clear_cache();
1091
1092        assert_eq!(store.get_cells(tx_hash, indices_bitarray).unwrap(), Some(expected));
1093    }
1094
1095    #[test]
1096    fn disk_double_cleanup_no_failure() {
1097        let (store, _dir) = tmp_store();
1098
1099        let blobs = rng_blobs(5);
1100        let all_hashes: Vec<_> = blobs.iter().map(|(tx, _)| *tx).collect();
1101        store.insert_all(blobs).unwrap();
1102        store.clear_cache();
1103
1104        // Schedule blobs for deletion
1105        store.delete_all(all_hashes.clone()).unwrap();
1106
1107        // First cleanup: files exist, all should succeed
1108        let stat1 = store.cleanup();
1109        assert_eq!(stat1.delete_succeed, 5);
1110        assert_eq!(stat1.delete_failed, 0);
1111
1112        // Manually re-enqueue the same hashes to simulate a concurrent cleanup race
1113        store.inner.txs_to_delete.write().extend(all_hashes);
1114
1115        // Second cleanup: files already deleted, should still report success (NotFound)
1116        let stat2 = store.cleanup();
1117        assert_eq!(stat2.delete_succeed, 5);
1118        assert_eq!(stat2.delete_failed, 0);
1119    }
1120}