Skip to main content

reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
6    eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
7    eip7840::BlobParams,
8    merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B128, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
17pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19/// A cache size heuristic based on the highest blob params
20///
21/// This uses the max blobs per tx and max blobs per block over 16 epochs: `21 * 6 * 512 = 64512`
22/// This should be ~4MB
23const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24    BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26/// A blob store that stores blob data on disk.
27///
28/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
29/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
30/// blobs from disk.
31#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33    inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37    /// Opens and initializes a new disk file blob store according to the given options.
38    pub fn open(
39        blob_dir: impl Into<PathBuf>,
40        opts: DiskFileBlobStoreConfig,
41    ) -> Result<Self, DiskFileBlobStoreError> {
42        let blob_dir = blob_dir.into();
43        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46        // initialize the blob store
47        inner.delete_all()?;
48        inner.create_blob_dir()?;
49
50        Ok(Self { inner: Arc::new(inner) })
51    }
52
53    #[cfg(test)]
54    fn is_cached(&self, tx: &B256) -> bool {
55        self.inner.blob_cache.lock().get(tx).is_some()
56    }
57
58    #[cfg(test)]
59    fn clear_cache(&self) {
60        self.inner.blob_cache.lock().clear()
61    }
62
63    /// Look up EIP-7594 blobs by their versioned hashes.
64    ///
65    /// This returns a result vector with the **same length and order** as the input
66    /// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
67    /// `None` if it is missing or an older sidecar version.
68    ///
69    /// The lookup first scans the in-memory cache and, if not all blobs are found, falls back to
70    /// reading candidate sidecars from disk using the `versioned_hash -> tx_hash` index.
71    fn get_by_versioned_hashes_eip7594(
72        &self,
73        versioned_hashes: &[B256],
74    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75        // we must return the blobs in order but we don't necessarily find them in the requested
76        // order
77        let mut result = vec![None; versioned_hashes.len()];
78        let mut missing_count = result.len();
79        // first scan all cached full sidecars
80        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82                for (hash_idx, match_result) in
83                    blob_sidecar.match_versioned_hashes(versioned_hashes)
84                {
85                    let slot = &mut result[hash_idx];
86                    if slot.is_none() {
87                        missing_count -= 1;
88                    }
89                    *slot = Some(match_result);
90                }
91            }
92
93            // return early if all blobs are found.
94            if missing_count == 0 {
95                // since versioned_hashes may have duplicates, we double check here
96                if result.iter().all(|blob| blob.is_some()) {
97                    return Ok(result);
98                }
99            }
100        }
101
102        // not all versioned hashes were found, try to look up a matching tx
103        let mut missing_tx_hashes = Vec::new();
104
105        {
106            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107            for (idx, _) in
108                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109            {
110                // this is safe because the result vec has the same len
111                let versioned_hash = versioned_hashes[idx];
112                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113                    missing_tx_hashes.push(tx_hash);
114                }
115            }
116        }
117
118        // if we have missing blobs, try to read them from disk and try again
119        if !missing_tx_hashes.is_empty() {
120            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121            for (_, blob_sidecar) in blobs_from_disk {
122                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123                    for (hash_idx, match_result) in
124                        blob_sidecar.match_versioned_hashes(versioned_hashes)
125                    {
126                        if result[hash_idx].is_none() {
127                            result[hash_idx] = Some(match_result);
128                        }
129                    }
130                }
131            }
132        }
133
134        Ok(result)
135    }
136
137    /// Look up EIP-7594 blob cells by their versioned hashes.
138    fn get_by_versioned_hashes_cells_eip7594(
139        &self,
140        versioned_hashes: &[B256],
141        indices_bitarray: B128,
142    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
143        let cell_mask = BlobCellMask::new(indices_bitarray);
144        let mut result = vec![None; versioned_hashes.len()];
145        let mut missing_count = result.len();
146
147        let cached_blob_sidecars = self
148            .inner
149            .blob_cache
150            .lock()
151            .iter()
152            .map(|(_, blob_sidecar)| Arc::clone(blob_sidecar))
153            .collect::<Vec<_>>();
154        for blob_sidecar in cached_blob_sidecars {
155            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
156                for (hash_idx, match_result) in blob_sidecar
157                    .match_versioned_hashes_cells(versioned_hashes, cell_mask)
158                    .map_err(|err| BlobStoreError::Other(Box::new(err)))?
159                {
160                    let slot = &mut result[hash_idx];
161                    if slot.is_none() {
162                        missing_count -= 1;
163                    }
164                    *slot = Some(match_result);
165                }
166            }
167
168            if missing_count == 0 && result.iter().all(Option::is_some) {
169                return Ok(result)
170            }
171        }
172
173        let mut missing_tx_hashes = Vec::new();
174        {
175            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
176            for (idx, _) in
177                result.iter().enumerate().filter(|(_, cells_and_proofs)| cells_and_proofs.is_none())
178            {
179                let versioned_hash = versioned_hashes[idx];
180                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
181                    missing_tx_hashes.push(tx_hash);
182                }
183            }
184        }
185
186        if !missing_tx_hashes.is_empty() {
187            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
188            for (_, blob_sidecar) in blobs_from_disk {
189                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
190                    for (hash_idx, match_result) in blob_sidecar
191                        .match_versioned_hashes_cells(versioned_hashes, cell_mask)
192                        .map_err(|err| BlobStoreError::Other(Box::new(err)))?
193                    {
194                        if result[hash_idx].is_none() {
195                            result[hash_idx] = Some(match_result);
196                        }
197                    }
198                }
199            }
200        }
201
202        Ok(result)
203    }
204}
205
206impl BlobStore for DiskFileBlobStore {
207    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
208        self.inner.insert_one(tx, data)
209    }
210
211    fn insert_all(
212        &self,
213        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
214    ) -> Result<(), BlobStoreError> {
215        if txs.is_empty() {
216            return Ok(())
217        }
218        self.inner.insert_many(txs)
219    }
220
221    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
222        if self.inner.contains(tx)? {
223            self.inner.txs_to_delete.write().insert(tx);
224        }
225        Ok(())
226    }
227
228    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
229        if txs.is_empty() {
230            return Ok(())
231        }
232        let txs = self.inner.retain_existing(txs)?;
233        self.inner.txs_to_delete.write().extend(txs);
234        Ok(())
235    }
236
237    fn cleanup(&self) -> BlobStoreCleanupStat {
238        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
239        let mut stat = BlobStoreCleanupStat::default();
240        let mut subsize = 0;
241        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
242        for tx in txs_to_delete {
243            let path = self.inner.blob_disk_file(tx);
244            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
245            match fs::remove_file(&path) {
246                Ok(_) => {
247                    stat.delete_succeed += 1;
248                    subsize += filesize;
249                }
250                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
251                    // Already deleted by a concurrent cleanup task
252                    stat.delete_succeed += 1;
253                }
254                Err(e) => {
255                    stat.delete_failed += 1;
256                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
257                    debug!(target:"txpool::blob", %err);
258                }
259            };
260        }
261        self.inner.size_tracker.sub_size(subsize as usize);
262        self.inner.size_tracker.sub_len(stat.delete_succeed);
263        stat
264    }
265
266    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
267        self.inner.get_one(tx)
268    }
269
270    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
271        self.inner.contains(tx)
272    }
273
274    fn get_all(
275        &self,
276        txs: Vec<B256>,
277    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
278        if txs.is_empty() {
279            return Ok(Vec::new())
280        }
281        self.inner.get_all(txs)
282    }
283
284    fn get_exact(
285        &self,
286        txs: Vec<B256>,
287    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
288        if txs.is_empty() {
289            return Ok(Vec::new())
290        }
291        self.inner.get_exact(txs)
292    }
293
294    fn get_by_versioned_hashes_v1(
295        &self,
296        versioned_hashes: &[B256],
297    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
298        // the response must always be the same len as the request, misses must be None
299        let mut result = vec![None; versioned_hashes.len()];
300
301        // first scan all cached full sidecars
302        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
303            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
304                for (hash_idx, match_result) in
305                    blob_sidecar.match_versioned_hashes(versioned_hashes)
306                {
307                    result[hash_idx] = Some(match_result);
308                }
309            }
310
311            // return early if all blobs are found.
312            if result.iter().all(|blob| blob.is_some()) {
313                return Ok(result);
314            }
315        }
316
317        // not all versioned hashes were be found, try to look up a matching tx
318
319        let mut missing_tx_hashes = Vec::new();
320
321        {
322            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
323            for (idx, _) in
324                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
325            {
326                // this is safe because the result vec has the same len
327                let versioned_hash = versioned_hashes[idx];
328                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
329                    missing_tx_hashes.push(tx_hash);
330                }
331            }
332        }
333
334        // if we have missing blobs, try to read them from disk and try again
335        if !missing_tx_hashes.is_empty() {
336            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
337            for (_, blob_sidecar) in blobs_from_disk {
338                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
339                    for (hash_idx, match_result) in
340                        blob_sidecar.match_versioned_hashes(versioned_hashes)
341                    {
342                        if result[hash_idx].is_none() {
343                            result[hash_idx] = Some(match_result);
344                        }
345                    }
346                }
347            }
348        }
349
350        Ok(result)
351    }
352
353    fn get_by_versioned_hashes_v2(
354        &self,
355        versioned_hashes: &[B256],
356    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
357        let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
358
359        // only return the blobs if we found all requested versioned hashes
360        if result.iter().all(|blob| blob.is_some()) {
361            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
362        } else {
363            Ok(None)
364        }
365    }
366
367    fn get_by_versioned_hashes_v3(
368        &self,
369        versioned_hashes: &[B256],
370    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
371        self.get_by_versioned_hashes_eip7594(versioned_hashes)
372    }
373
374    fn get_by_versioned_hashes_v4(
375        &self,
376        versioned_hashes: &[B256],
377        indices_bitarray: B128,
378    ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
379        self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
380    }
381
382    fn get_cells(
383        &self,
384        tx: B256,
385        indices_bitarray: B128,
386    ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
387        let Some(sidecar) = self.get(tx)? else {
388            return Ok(None);
389        };
390
391        let Some(sidecar) = sidecar.as_eip7594() else {
392            return Ok(None);
393        };
394
395        let versioned_hashes = sidecar.versioned_hashes().collect::<Vec<_>>();
396
397        let matches =
398            self.get_by_versioned_hashes_cells_eip7594(&versioned_hashes, indices_bitarray)?;
399
400        let mut cells = Vec::new();
401
402        for matched in matches {
403            let Some(matched) = matched else {
404                return Ok(None);
405            };
406
407            for cell in matched.blob_cells {
408                let Some(cell) = cell else {
409                    return Ok(None);
410                };
411
412                cells.push(cell);
413            }
414        }
415
416        Ok(Some(cells))
417    }
418
419    fn data_size_hint(&self) -> Option<usize> {
420        Some(self.inner.size_tracker.data_size())
421    }
422
423    fn blobs_len(&self) -> usize {
424        self.inner.size_tracker.blobs_len()
425    }
426}
427
428struct DiskFileBlobStoreInner {
429    blob_dir: PathBuf,
430    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
431    size_tracker: BlobStoreSize,
432    file_lock: RwLock<()>,
433    txs_to_delete: RwLock<B256Set>,
434    /// Tracks of known versioned hashes and a transaction they exist in
435    ///
436    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
437    /// the most recent one.
438    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
439}
440
441impl DiskFileBlobStoreInner {
442    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
443    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
444        Self {
445            blob_dir,
446            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
447            size_tracker: Default::default(),
448            file_lock: Default::default(),
449            txs_to_delete: Default::default(),
450            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
451                VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
452            ))),
453        }
454    }
455
456    /// Creates the directory where blobs will be stored on disk.
457    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
458        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
459        fs::create_dir_all(&self.blob_dir)
460            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
461    }
462
463    /// Deletes the entire blob store.
464    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
465        match fs::remove_dir_all(&self.blob_dir) {
466            Ok(_) => {
467                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
468            }
469            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
470            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
471        }
472        Ok(())
473    }
474
475    /// Ensures blob is in the blob cache and written to the disk.
476    fn insert_one(
477        &self,
478        tx: B256,
479        data: BlobTransactionSidecarVariant,
480    ) -> Result<(), BlobStoreError> {
481        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
482        data.rlp_encode_fields(&mut buf);
483
484        {
485            // cache the versioned hashes to tx hash
486            let mut map = self.versioned_hashes_to_txhash.lock();
487            data.versioned_hashes().for_each(|hash| {
488                map.insert(hash, tx);
489            });
490        }
491
492        self.blob_cache.lock().insert(tx, Arc::new(data));
493
494        let size = self.write_one_encoded(tx, &buf)?;
495
496        self.size_tracker.add_size(size);
497        self.size_tracker.inc_len(1);
498        Ok(())
499    }
500
501    /// Ensures blobs are in the blob cache and written to the disk.
502    fn insert_many(
503        &self,
504        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
505    ) -> Result<(), BlobStoreError> {
506        let raw = txs
507            .iter()
508            .map(|(tx, data)| {
509                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
510                data.rlp_encode_fields(&mut buf);
511                (self.blob_disk_file(*tx), buf)
512            })
513            .collect::<Vec<_>>();
514
515        {
516            // cache versioned hashes to tx hash
517            let mut map = self.versioned_hashes_to_txhash.lock();
518            for (tx, data) in &txs {
519                data.versioned_hashes().for_each(|hash| {
520                    map.insert(hash, *tx);
521                });
522            }
523        }
524
525        {
526            // cache blobs
527            let mut cache = self.blob_cache.lock();
528            for (tx, data) in txs {
529                cache.insert(tx, Arc::new(data));
530            }
531        }
532
533        let mut add = 0;
534        let mut num = 0;
535        {
536            let _lock = self.file_lock.write();
537            for (path, data) in raw {
538                if path.exists() {
539                    debug!(target:"txpool::blob", ?path, "Blob already exists");
540                } else if let Err(err) = fs::write(&path, &data) {
541                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
542                } else {
543                    add += data.len();
544                    num += 1;
545                }
546            }
547        }
548        self.size_tracker.add_size(add);
549        self.size_tracker.inc_len(num);
550
551        Ok(())
552    }
553
554    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
555    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
556        if self.blob_cache.lock().get(&tx).is_some() {
557            return Ok(true)
558        }
559        // we only check if the file exists and assume it's valid
560        Ok(self.blob_disk_file(tx).is_file())
561    }
562
563    /// Returns all the blob transactions which are in the cache or on the disk.
564    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
565        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
566            let mut cache = self.blob_cache.lock();
567            txs.into_iter().partition(|tx| cache.get(tx).is_some())
568        };
569
570        let mut existing = in_cache;
571        for tx in not_in_cache {
572            if self.blob_disk_file(tx).is_file() {
573                existing.push(tx);
574            }
575        }
576
577        Ok(existing)
578    }
579
580    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
581    fn get_one(
582        &self,
583        tx: B256,
584    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
585        if let Some(blob) = self.blob_cache.lock().get(&tx) {
586            return Ok(Some(blob.clone()))
587        }
588
589        if let Some(blob) = self.read_one(tx)? {
590            let blob_arc = Arc::new(blob);
591            self.blob_cache.lock().insert(tx, blob_arc.clone());
592            return Ok(Some(blob_arc))
593        }
594
595        Ok(None)
596    }
597
598    /// Returns the path to the blob file for the given transaction hash.
599    #[inline]
600    fn blob_disk_file(&self, tx: B256) -> PathBuf {
601        self.blob_dir.join(format!("{tx:x}"))
602    }
603
604    /// Retrieves the blob data for the given transaction hash.
605    #[inline]
606    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
607        let path = self.blob_disk_file(tx);
608        let data = {
609            let _lock = self.file_lock.read();
610            match fs::read(&path) {
611                Ok(data) => data,
612                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
613                Err(e) => {
614                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
615                        tx, path, e,
616                    ))))
617                }
618            }
619        };
620        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
621            .map(Some)
622            .map_err(BlobStoreError::DecodeError)
623    }
624
625    /// Returns decoded blobs read from disk.
626    ///
627    /// Only returns sidecars that were found and successfully decoded.
628    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
629        self.read_many_raw(txs)
630            .into_iter()
631            .filter_map(|(tx, data)| {
632                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
633                    .map(|sidecar| (tx, sidecar))
634                    .ok()
635            })
636            .collect()
637    }
638
639    /// Retrieves the raw blob data for the given transaction hashes.
640    ///
641    /// Only returns the blobs that were found in file.
642    #[inline]
643    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
644        let mut res = Vec::with_capacity(txs.len());
645        let _lock = self.file_lock.read();
646        for tx in txs {
647            let path = self.blob_disk_file(tx);
648            match fs::read(&path) {
649                Ok(data) => {
650                    res.push((tx, data));
651                }
652                Err(err) => {
653                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
654                }
655            };
656        }
657        res
658    }
659
660    /// Writes the blob data for the given transaction hash to the disk.
661    #[inline]
662    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
663        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
664        let mut add = 0;
665        let path = self.blob_disk_file(tx);
666        {
667            let _lock = self.file_lock.write();
668            if !path.exists() {
669                fs::write(&path, data)
670                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
671                add = data.len();
672            }
673        }
674        Ok(add)
675    }
676
677    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
678    ///
679    /// This will not return an error if there are missing blobs. Therefore, the result may be a
680    /// subset of the request or an empty vector if none of the blobs were found.
681    #[inline]
682    fn get_all(
683        &self,
684        txs: Vec<B256>,
685    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
686        let mut res = Vec::with_capacity(txs.len());
687        let mut cache_miss = Vec::new();
688        {
689            let mut cache = self.blob_cache.lock();
690            for tx in txs {
691                if let Some(blob) = cache.get(&tx) {
692                    res.push((tx, blob.clone()));
693                } else {
694                    cache_miss.push(tx)
695                }
696            }
697        }
698        if cache_miss.is_empty() {
699            return Ok(res)
700        }
701        let from_disk = self.read_many_decoded(cache_miss);
702        if from_disk.is_empty() {
703            return Ok(res)
704        }
705        let from_disk = from_disk
706            .into_iter()
707            .map(|(tx, data)| {
708                let data = Arc::new(data);
709                res.push((tx, data.clone()));
710                (tx, data)
711            })
712            .collect::<Vec<_>>();
713
714        let mut cache = self.blob_cache.lock();
715        for (tx, data) in from_disk {
716            cache.insert(tx, data);
717        }
718
719        Ok(res)
720    }
721
722    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
723    ///
724    /// Returns an error if there are any missing blobs.
725    #[inline]
726    fn get_exact(
727        &self,
728        txs: Vec<B256>,
729    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
730        txs.into_iter()
731            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
732            .collect()
733    }
734}
735
736impl fmt::Debug for DiskFileBlobStoreInner {
737    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738        f.debug_struct("DiskFileBlobStoreInner")
739            .field("blob_dir", &self.blob_dir)
740            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
741            .field("txs_to_delete", &self.txs_to_delete.try_read())
742            .finish()
743    }
744}
745
746/// Errors that can occur when interacting with a disk file blob store.
747#[derive(Debug, thiserror::Error)]
748pub enum DiskFileBlobStoreError {
749    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
750    #[error("failed to open blobstore at {0}: {1}")]
751    /// Indicates a failure to open the blob store directory.
752    Open(PathBuf, io::Error),
753    /// Failure while reading a blob file.
754    #[error("[{0}] failed to read blob file at {1}: {2}")]
755    /// Indicates a failure while reading a blob file.
756    ReadFile(TxHash, PathBuf, io::Error),
757    /// Failure while writing a blob file.
758    #[error("[{0}] failed to write blob file at {1}: {2}")]
759    /// Indicates a failure while writing a blob file.
760    WriteFile(TxHash, PathBuf, io::Error),
761    /// Failure while deleting a blob file.
762    #[error("[{0}] failed to delete blob file at {1}: {2}")]
763    /// Indicates a failure while deleting a blob file.
764    DeleteFile(TxHash, PathBuf, io::Error),
765}
766
767impl From<DiskFileBlobStoreError> for BlobStoreError {
768    fn from(value: DiskFileBlobStoreError) -> Self {
769        Self::Other(Box::new(value))
770    }
771}
772
773/// Configuration for a disk file blob store.
774#[derive(Debug, Clone)]
775pub struct DiskFileBlobStoreConfig {
776    /// The maximum number of blobs to keep in the in memory blob cache.
777    pub max_cached_entries: u32,
778    /// How to open the blob store.
779    pub open: OpenDiskFileBlobStore,
780}
781
782impl Default for DiskFileBlobStoreConfig {
783    fn default() -> Self {
784        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
785    }
786}
787
788impl DiskFileBlobStoreConfig {
789    /// Set maximum number of blobs to keep in the in memory blob cache.
790    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
791        self.max_cached_entries = max_cached_entries;
792        self
793    }
794}
795
796/// How to open a disk file blob store.
797#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
798pub enum OpenDiskFileBlobStore {
799    /// Clear everything in the blob store.
800    #[default]
801    Clear,
802    /// Keep the existing blob store and index
803    ReIndex,
804}
805
806#[cfg(test)]
807mod tests {
808    use alloy_consensus::BlobTransactionSidecar;
809    use alloy_eips::{
810        eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
811        eip7594::{
812            BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
813        },
814    };
815
816    use super::*;
817    use std::sync::atomic::Ordering;
818
819    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
820        let dir = tempfile::tempdir().unwrap();
821        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
822        (store, dir)
823    }
824
825    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
826        let mut rng = rand::rng();
827        (0..num)
828            .map(|_| {
829                let tx = TxHash::random_with(&mut rng);
830                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
831                    blobs: vec![],
832                    commitments: vec![],
833                    proofs: vec![],
834                });
835                (tx, blob)
836            })
837            .collect()
838    }
839
840    fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
841        let blob = Blob::default();
842        let commitment = Bytes48::default();
843        let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
844
845        let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
846
847        let expected =
848            BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
849        let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
850
851        (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
852    }
853
854    #[test]
855    fn disk_insert_all_get_all() {
856        let (store, _dir) = tmp_store();
857
858        let blobs = rng_blobs(10);
859        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
860        store.insert_all(blobs.clone()).unwrap();
861
862        // all cached
863        for (tx, blob) in &blobs {
864            assert!(store.is_cached(tx));
865            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
866            assert_eq!(b, *blob);
867        }
868
869        let all = store.get_all(all_hashes.clone()).unwrap();
870        for (tx, blob) in all {
871            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
872        }
873
874        assert!(store.contains(all_hashes[0]).unwrap());
875        store.delete_all(all_hashes.clone()).unwrap();
876        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
877        store.clear_cache();
878        store.cleanup();
879
880        assert!(store.get(blobs[0].0).unwrap().is_none());
881
882        let all = store.get_all(all_hashes.clone()).unwrap();
883        assert!(all.is_empty());
884
885        assert!(!store.contains(all_hashes[0]).unwrap());
886        assert!(store.get_exact(all_hashes).is_err());
887
888        assert_eq!(store.data_size_hint(), Some(0));
889        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
890    }
891
892    #[test]
893    fn disk_insert_and_retrieve() {
894        let (store, _dir) = tmp_store();
895
896        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
897        store.insert(tx, blob.clone()).unwrap();
898
899        assert!(store.is_cached(&tx));
900        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
901        assert_eq!(retrieved_blob, blob);
902    }
903
904    #[test]
905    fn disk_delete_blob() {
906        let (store, _dir) = tmp_store();
907
908        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
909        store.insert(tx, blob).unwrap();
910        assert!(store.is_cached(&tx));
911
912        store.delete(tx).unwrap();
913        assert!(store.inner.txs_to_delete.read().contains(&tx));
914        store.cleanup();
915
916        let result = store.get(tx).unwrap();
917        assert_eq!(
918            result,
919            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
920                blobs: vec![],
921                commitments: vec![],
922                proofs: vec![]
923            })))
924        );
925    }
926
927    #[test]
928    fn disk_insert_all_and_delete_all() {
929        let (store, _dir) = tmp_store();
930
931        let blobs = rng_blobs(5);
932        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
933        store.insert_all(blobs.clone()).unwrap();
934
935        for (tx, _) in &blobs {
936            assert!(store.is_cached(tx));
937        }
938
939        store.delete_all(txs.clone()).unwrap();
940        store.cleanup();
941
942        for tx in txs {
943            let result = store.get(tx).unwrap();
944            assert_eq!(
945                result,
946                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
947                    blobs: vec![],
948                    commitments: vec![],
949                    proofs: vec![]
950                })))
951            );
952        }
953    }
954
955    #[test]
956    fn disk_get_all_blobs() {
957        let (store, _dir) = tmp_store();
958
959        let blobs = rng_blobs(3);
960        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
961        store.insert_all(blobs.clone()).unwrap();
962
963        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
964        for (tx, blob) in retrieved_blobs {
965            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
966        }
967
968        store.delete_all(txs).unwrap();
969        store.cleanup();
970    }
971
972    #[test]
973    fn disk_get_exact_blobs_success() {
974        let (store, _dir) = tmp_store();
975
976        let blobs = rng_blobs(3);
977        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
978        store.insert_all(blobs.clone()).unwrap();
979
980        let retrieved_blobs = store.get_exact(txs).unwrap();
981        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
982            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
983        }
984    }
985
986    #[test]
987    fn disk_get_exact_blobs_failure() {
988        let (store, _dir) = tmp_store();
989
990        let blobs = rng_blobs(2);
991        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
992        store.insert_all(blobs).unwrap();
993
994        // Try to get a blob that was never inserted
995        let missing_tx = TxHash::random();
996        let result = store.get_exact(vec![txs[0], missing_tx]);
997        assert!(result.is_err());
998    }
999
1000    #[test]
1001    fn disk_data_size_hint() {
1002        let (store, _dir) = tmp_store();
1003        assert_eq!(store.data_size_hint(), Some(0));
1004
1005        let blobs = rng_blobs(2);
1006        store.insert_all(blobs).unwrap();
1007        assert!(store.data_size_hint().unwrap() > 0);
1008    }
1009
1010    #[test]
1011    fn disk_cleanup_stat() {
1012        let (store, _dir) = tmp_store();
1013
1014        let blobs = rng_blobs(3);
1015        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
1016        store.insert_all(blobs).unwrap();
1017
1018        store.delete_all(txs).unwrap();
1019        let stat = store.cleanup();
1020        assert_eq!(stat.delete_succeed, 3);
1021        assert_eq!(stat.delete_failed, 0);
1022    }
1023
1024    #[test]
1025    fn disk_get_blobs_v3_returns_partial_results() {
1026        let (store, _dir) = tmp_store();
1027
1028        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1029        store.insert(TxHash::random(), sidecar).unwrap();
1030
1031        assert_ne!(versioned_hash, B256::ZERO);
1032
1033        let request = vec![versioned_hash, B256::ZERO];
1034        let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
1035        assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
1036
1037        let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
1038        assert_eq!(v3, vec![Some(expected), None]);
1039    }
1040
1041    #[test]
1042    fn disk_get_blobs_v4_returns_requested_cells() {
1043        let (store, _dir) = tmp_store();
1044
1045        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1046        store.insert(TxHash::random(), sidecar).unwrap();
1047
1048        let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
1049        let request = vec![versioned_hash, B256::ZERO];
1050
1051        let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
1052        assert_eq!(v4.len(), request.len());
1053        assert!(v4[1].is_none());
1054
1055        let cells_and_proofs = v4[0].as_ref().unwrap();
1056        assert_eq!(cells_and_proofs.blob_cells.len(), 2);
1057        assert_eq!(cells_and_proofs.proofs.len(), 2);
1058        assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
1059        assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
1060    }
1061
1062    #[test]
1063    fn disk_get_blobs_v3_can_fallback_to_disk() {
1064        let (store, _dir) = tmp_store();
1065
1066        let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1067        store.insert(TxHash::random(), sidecar).unwrap();
1068        store.clear_cache();
1069
1070        let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
1071        assert_eq!(v3, vec![Some(expected)]);
1072    }
1073
1074    #[test]
1075    fn disk_get_blobs_v4_can_fallback_to_disk() {
1076        let (store, _dir) = tmp_store();
1077
1078        let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1079        store.insert(TxHash::random(), sidecar).unwrap();
1080        store.clear_cache();
1081
1082        let v4 = store.get_by_versioned_hashes_v4(&[versioned_hash], B128::from(1u128)).unwrap();
1083        let cells_and_proofs = v4[0].as_ref().unwrap();
1084        assert_eq!(cells_and_proofs.blob_cells.len(), 1);
1085        assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default())]);
1086    }
1087
1088    #[test]
1089    fn disk_double_cleanup_no_failure() {
1090        let (store, _dir) = tmp_store();
1091
1092        let blobs = rng_blobs(5);
1093        let all_hashes: Vec<_> = blobs.iter().map(|(tx, _)| *tx).collect();
1094        store.insert_all(blobs).unwrap();
1095        store.clear_cache();
1096
1097        // Schedule blobs for deletion
1098        store.delete_all(all_hashes.clone()).unwrap();
1099
1100        // First cleanup: files exist, all should succeed
1101        let stat1 = store.cleanup();
1102        assert_eq!(stat1.delete_succeed, 5);
1103        assert_eq!(stat1.delete_failed, 0);
1104
1105        // Manually re-enqueue the same hashes to simulate a concurrent cleanup race
1106        store.inner.txs_to_delete.write().extend(all_hashes);
1107
1108        // Second cleanup: files already deleted, should still report success (NotFound)
1109        let stat2 = store.cleanup();
1110        assert_eq!(stat2.delete_succeed, 5);
1111        assert_eq!(stat2.delete_failed, 0);
1112    }
1113}