1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
6 eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
7 eip7840::BlobParams,
8 merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B128, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24 BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33 inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37 pub fn open(
39 blob_dir: impl Into<PathBuf>,
40 opts: DiskFileBlobStoreConfig,
41 ) -> Result<Self, DiskFileBlobStoreError> {
42 let blob_dir = blob_dir.into();
43 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46 inner.delete_all()?;
48 inner.create_blob_dir()?;
49
50 Ok(Self { inner: Arc::new(inner) })
51 }
52
53 #[cfg(test)]
54 fn is_cached(&self, tx: &B256) -> bool {
55 self.inner.blob_cache.lock().get(tx).is_some()
56 }
57
58 #[cfg(test)]
59 fn clear_cache(&self) {
60 self.inner.blob_cache.lock().clear()
61 }
62
63 fn get_by_versioned_hashes_eip7594(
72 &self,
73 versioned_hashes: &[B256],
74 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75 let mut result = vec![None; versioned_hashes.len()];
78 let mut missing_count = result.len();
79 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82 for (hash_idx, match_result) in
83 blob_sidecar.match_versioned_hashes(versioned_hashes)
84 {
85 let slot = &mut result[hash_idx];
86 if slot.is_none() {
87 missing_count -= 1;
88 }
89 *slot = Some(match_result);
90 }
91 }
92
93 if missing_count == 0 {
95 if result.iter().all(|blob| blob.is_some()) {
97 return Ok(result);
98 }
99 }
100 }
101
102 let mut missing_tx_hashes = Vec::new();
104
105 {
106 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107 for (idx, _) in
108 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109 {
110 let versioned_hash = versioned_hashes[idx];
112 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113 missing_tx_hashes.push(tx_hash);
114 }
115 }
116 }
117
118 if !missing_tx_hashes.is_empty() {
120 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121 for (_, blob_sidecar) in blobs_from_disk {
122 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123 for (hash_idx, match_result) in
124 blob_sidecar.match_versioned_hashes(versioned_hashes)
125 {
126 if result[hash_idx].is_none() {
127 result[hash_idx] = Some(match_result);
128 }
129 }
130 }
131 }
132 }
133
134 Ok(result)
135 }
136
137 fn get_by_versioned_hashes_cells_eip7594(
139 &self,
140 versioned_hashes: &[B256],
141 indices_bitarray: B128,
142 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
143 let cell_mask = BlobCellMask::new(indices_bitarray);
144 let mut result = vec![None; versioned_hashes.len()];
145 let mut missing_count = result.len();
146
147 let cached_blob_sidecars = self
148 .inner
149 .blob_cache
150 .lock()
151 .iter()
152 .map(|(_, blob_sidecar)| Arc::clone(blob_sidecar))
153 .collect::<Vec<_>>();
154 for blob_sidecar in cached_blob_sidecars {
155 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
156 for (hash_idx, match_result) in blob_sidecar
157 .match_versioned_hashes_cells(versioned_hashes, cell_mask)
158 .map_err(|err| BlobStoreError::Other(Box::new(err)))?
159 {
160 let slot = &mut result[hash_idx];
161 if slot.is_none() {
162 missing_count -= 1;
163 }
164 *slot = Some(match_result);
165 }
166 }
167
168 if missing_count == 0 && result.iter().all(Option::is_some) {
169 return Ok(result)
170 }
171 }
172
173 let mut missing_tx_hashes = Vec::new();
174 {
175 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
176 for (idx, _) in
177 result.iter().enumerate().filter(|(_, cells_and_proofs)| cells_and_proofs.is_none())
178 {
179 let versioned_hash = versioned_hashes[idx];
180 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
181 missing_tx_hashes.push(tx_hash);
182 }
183 }
184 }
185
186 if !missing_tx_hashes.is_empty() {
187 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
188 for (_, blob_sidecar) in blobs_from_disk {
189 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
190 for (hash_idx, match_result) in blob_sidecar
191 .match_versioned_hashes_cells(versioned_hashes, cell_mask)
192 .map_err(|err| BlobStoreError::Other(Box::new(err)))?
193 {
194 if result[hash_idx].is_none() {
195 result[hash_idx] = Some(match_result);
196 }
197 }
198 }
199 }
200 }
201
202 Ok(result)
203 }
204}
205
206impl BlobStore for DiskFileBlobStore {
207 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
208 self.inner.insert_one(tx, data)
209 }
210
211 fn insert_all(
212 &self,
213 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
214 ) -> Result<(), BlobStoreError> {
215 if txs.is_empty() {
216 return Ok(())
217 }
218 self.inner.insert_many(txs)
219 }
220
221 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
222 if self.inner.contains(tx)? {
223 self.inner.txs_to_delete.write().insert(tx);
224 }
225 Ok(())
226 }
227
228 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
229 if txs.is_empty() {
230 return Ok(())
231 }
232 let txs = self.inner.retain_existing(txs)?;
233 self.inner.txs_to_delete.write().extend(txs);
234 Ok(())
235 }
236
237 fn cleanup(&self) -> BlobStoreCleanupStat {
238 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
239 let mut stat = BlobStoreCleanupStat::default();
240 let mut subsize = 0;
241 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
242 for tx in txs_to_delete {
243 let path = self.inner.blob_disk_file(tx);
244 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
245 match fs::remove_file(&path) {
246 Ok(_) => {
247 stat.delete_succeed += 1;
248 subsize += filesize;
249 }
250 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
251 stat.delete_succeed += 1;
253 }
254 Err(e) => {
255 stat.delete_failed += 1;
256 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
257 debug!(target:"txpool::blob", %err);
258 }
259 };
260 }
261 self.inner.size_tracker.sub_size(subsize as usize);
262 self.inner.size_tracker.sub_len(stat.delete_succeed);
263 stat
264 }
265
266 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
267 self.inner.get_one(tx)
268 }
269
270 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
271 self.inner.contains(tx)
272 }
273
274 fn get_all(
275 &self,
276 txs: Vec<B256>,
277 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
278 if txs.is_empty() {
279 return Ok(Vec::new())
280 }
281 self.inner.get_all(txs)
282 }
283
284 fn get_exact(
285 &self,
286 txs: Vec<B256>,
287 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
288 if txs.is_empty() {
289 return Ok(Vec::new())
290 }
291 self.inner.get_exact(txs)
292 }
293
294 fn get_by_versioned_hashes_v1(
295 &self,
296 versioned_hashes: &[B256],
297 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
298 let mut result = vec![None; versioned_hashes.len()];
300
301 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
303 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
304 for (hash_idx, match_result) in
305 blob_sidecar.match_versioned_hashes(versioned_hashes)
306 {
307 result[hash_idx] = Some(match_result);
308 }
309 }
310
311 if result.iter().all(|blob| blob.is_some()) {
313 return Ok(result);
314 }
315 }
316
317 let mut missing_tx_hashes = Vec::new();
320
321 {
322 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
323 for (idx, _) in
324 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
325 {
326 let versioned_hash = versioned_hashes[idx];
328 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
329 missing_tx_hashes.push(tx_hash);
330 }
331 }
332 }
333
334 if !missing_tx_hashes.is_empty() {
336 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
337 for (_, blob_sidecar) in blobs_from_disk {
338 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
339 for (hash_idx, match_result) in
340 blob_sidecar.match_versioned_hashes(versioned_hashes)
341 {
342 if result[hash_idx].is_none() {
343 result[hash_idx] = Some(match_result);
344 }
345 }
346 }
347 }
348 }
349
350 Ok(result)
351 }
352
353 fn get_by_versioned_hashes_v2(
354 &self,
355 versioned_hashes: &[B256],
356 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
357 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
358
359 if result.iter().all(|blob| blob.is_some()) {
361 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
362 } else {
363 Ok(None)
364 }
365 }
366
367 fn get_by_versioned_hashes_v3(
368 &self,
369 versioned_hashes: &[B256],
370 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
371 self.get_by_versioned_hashes_eip7594(versioned_hashes)
372 }
373
374 fn get_by_versioned_hashes_v4(
375 &self,
376 versioned_hashes: &[B256],
377 indices_bitarray: B128,
378 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
379 self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
380 }
381
382 fn get_cells(
383 &self,
384 tx: B256,
385 indices_bitarray: B128,
386 ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
387 let Some(sidecar) = self.get(tx)? else {
388 return Ok(None);
389 };
390
391 let Some(sidecar) = sidecar.as_eip7594() else {
392 return Ok(None);
393 };
394
395 let versioned_hashes = sidecar.versioned_hashes().collect::<Vec<_>>();
396
397 let matches =
398 self.get_by_versioned_hashes_cells_eip7594(&versioned_hashes, indices_bitarray)?;
399
400 let mut cells = Vec::new();
401
402 for matched in matches {
403 let Some(matched) = matched else {
404 return Ok(None);
405 };
406
407 for cell in matched.blob_cells {
408 let Some(cell) = cell else {
409 return Ok(None);
410 };
411
412 cells.push(cell);
413 }
414 }
415
416 Ok(Some(cells))
417 }
418
419 fn data_size_hint(&self) -> Option<usize> {
420 Some(self.inner.size_tracker.data_size())
421 }
422
423 fn blobs_len(&self) -> usize {
424 self.inner.size_tracker.blobs_len()
425 }
426}
427
428struct DiskFileBlobStoreInner {
429 blob_dir: PathBuf,
430 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
431 size_tracker: BlobStoreSize,
432 file_lock: RwLock<()>,
433 txs_to_delete: RwLock<B256Set>,
434 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
439}
440
441impl DiskFileBlobStoreInner {
442 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
444 Self {
445 blob_dir,
446 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
447 size_tracker: Default::default(),
448 file_lock: Default::default(),
449 txs_to_delete: Default::default(),
450 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
451 VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
452 ))),
453 }
454 }
455
456 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
458 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
459 fs::create_dir_all(&self.blob_dir)
460 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
461 }
462
463 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
465 match fs::remove_dir_all(&self.blob_dir) {
466 Ok(_) => {
467 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
468 }
469 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
470 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
471 }
472 Ok(())
473 }
474
475 fn insert_one(
477 &self,
478 tx: B256,
479 data: BlobTransactionSidecarVariant,
480 ) -> Result<(), BlobStoreError> {
481 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
482 data.rlp_encode_fields(&mut buf);
483
484 {
485 let mut map = self.versioned_hashes_to_txhash.lock();
487 data.versioned_hashes().for_each(|hash| {
488 map.insert(hash, tx);
489 });
490 }
491
492 self.blob_cache.lock().insert(tx, Arc::new(data));
493
494 let size = self.write_one_encoded(tx, &buf)?;
495
496 self.size_tracker.add_size(size);
497 self.size_tracker.inc_len(1);
498 Ok(())
499 }
500
501 fn insert_many(
503 &self,
504 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
505 ) -> Result<(), BlobStoreError> {
506 let raw = txs
507 .iter()
508 .map(|(tx, data)| {
509 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
510 data.rlp_encode_fields(&mut buf);
511 (self.blob_disk_file(*tx), buf)
512 })
513 .collect::<Vec<_>>();
514
515 {
516 let mut map = self.versioned_hashes_to_txhash.lock();
518 for (tx, data) in &txs {
519 data.versioned_hashes().for_each(|hash| {
520 map.insert(hash, *tx);
521 });
522 }
523 }
524
525 {
526 let mut cache = self.blob_cache.lock();
528 for (tx, data) in txs {
529 cache.insert(tx, Arc::new(data));
530 }
531 }
532
533 let mut add = 0;
534 let mut num = 0;
535 {
536 let _lock = self.file_lock.write();
537 for (path, data) in raw {
538 if path.exists() {
539 debug!(target:"txpool::blob", ?path, "Blob already exists");
540 } else if let Err(err) = fs::write(&path, &data) {
541 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
542 } else {
543 add += data.len();
544 num += 1;
545 }
546 }
547 }
548 self.size_tracker.add_size(add);
549 self.size_tracker.inc_len(num);
550
551 Ok(())
552 }
553
554 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
556 if self.blob_cache.lock().get(&tx).is_some() {
557 return Ok(true)
558 }
559 Ok(self.blob_disk_file(tx).is_file())
561 }
562
563 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
565 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
566 let mut cache = self.blob_cache.lock();
567 txs.into_iter().partition(|tx| cache.get(tx).is_some())
568 };
569
570 let mut existing = in_cache;
571 for tx in not_in_cache {
572 if self.blob_disk_file(tx).is_file() {
573 existing.push(tx);
574 }
575 }
576
577 Ok(existing)
578 }
579
580 fn get_one(
582 &self,
583 tx: B256,
584 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
585 if let Some(blob) = self.blob_cache.lock().get(&tx) {
586 return Ok(Some(blob.clone()))
587 }
588
589 if let Some(blob) = self.read_one(tx)? {
590 let blob_arc = Arc::new(blob);
591 self.blob_cache.lock().insert(tx, blob_arc.clone());
592 return Ok(Some(blob_arc))
593 }
594
595 Ok(None)
596 }
597
598 #[inline]
600 fn blob_disk_file(&self, tx: B256) -> PathBuf {
601 self.blob_dir.join(format!("{tx:x}"))
602 }
603
604 #[inline]
606 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
607 let path = self.blob_disk_file(tx);
608 let data = {
609 let _lock = self.file_lock.read();
610 match fs::read(&path) {
611 Ok(data) => data,
612 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
613 Err(e) => {
614 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
615 tx, path, e,
616 ))))
617 }
618 }
619 };
620 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
621 .map(Some)
622 .map_err(BlobStoreError::DecodeError)
623 }
624
625 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
629 self.read_many_raw(txs)
630 .into_iter()
631 .filter_map(|(tx, data)| {
632 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
633 .map(|sidecar| (tx, sidecar))
634 .ok()
635 })
636 .collect()
637 }
638
639 #[inline]
643 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
644 let mut res = Vec::with_capacity(txs.len());
645 let _lock = self.file_lock.read();
646 for tx in txs {
647 let path = self.blob_disk_file(tx);
648 match fs::read(&path) {
649 Ok(data) => {
650 res.push((tx, data));
651 }
652 Err(err) => {
653 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
654 }
655 };
656 }
657 res
658 }
659
660 #[inline]
662 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
663 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
664 let mut add = 0;
665 let path = self.blob_disk_file(tx);
666 {
667 let _lock = self.file_lock.write();
668 if !path.exists() {
669 fs::write(&path, data)
670 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
671 add = data.len();
672 }
673 }
674 Ok(add)
675 }
676
677 #[inline]
682 fn get_all(
683 &self,
684 txs: Vec<B256>,
685 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
686 let mut res = Vec::with_capacity(txs.len());
687 let mut cache_miss = Vec::new();
688 {
689 let mut cache = self.blob_cache.lock();
690 for tx in txs {
691 if let Some(blob) = cache.get(&tx) {
692 res.push((tx, blob.clone()));
693 } else {
694 cache_miss.push(tx)
695 }
696 }
697 }
698 if cache_miss.is_empty() {
699 return Ok(res)
700 }
701 let from_disk = self.read_many_decoded(cache_miss);
702 if from_disk.is_empty() {
703 return Ok(res)
704 }
705 let from_disk = from_disk
706 .into_iter()
707 .map(|(tx, data)| {
708 let data = Arc::new(data);
709 res.push((tx, data.clone()));
710 (tx, data)
711 })
712 .collect::<Vec<_>>();
713
714 let mut cache = self.blob_cache.lock();
715 for (tx, data) in from_disk {
716 cache.insert(tx, data);
717 }
718
719 Ok(res)
720 }
721
722 #[inline]
726 fn get_exact(
727 &self,
728 txs: Vec<B256>,
729 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
730 txs.into_iter()
731 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
732 .collect()
733 }
734}
735
736impl fmt::Debug for DiskFileBlobStoreInner {
737 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738 f.debug_struct("DiskFileBlobStoreInner")
739 .field("blob_dir", &self.blob_dir)
740 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
741 .field("txs_to_delete", &self.txs_to_delete.try_read())
742 .finish()
743 }
744}
745
746#[derive(Debug, thiserror::Error)]
748pub enum DiskFileBlobStoreError {
749 #[error("failed to open blobstore at {0}: {1}")]
751 Open(PathBuf, io::Error),
753 #[error("[{0}] failed to read blob file at {1}: {2}")]
755 ReadFile(TxHash, PathBuf, io::Error),
757 #[error("[{0}] failed to write blob file at {1}: {2}")]
759 WriteFile(TxHash, PathBuf, io::Error),
761 #[error("[{0}] failed to delete blob file at {1}: {2}")]
763 DeleteFile(TxHash, PathBuf, io::Error),
765}
766
767impl From<DiskFileBlobStoreError> for BlobStoreError {
768 fn from(value: DiskFileBlobStoreError) -> Self {
769 Self::Other(Box::new(value))
770 }
771}
772
773#[derive(Debug, Clone)]
775pub struct DiskFileBlobStoreConfig {
776 pub max_cached_entries: u32,
778 pub open: OpenDiskFileBlobStore,
780}
781
782impl Default for DiskFileBlobStoreConfig {
783 fn default() -> Self {
784 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
785 }
786}
787
788impl DiskFileBlobStoreConfig {
789 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
791 self.max_cached_entries = max_cached_entries;
792 self
793 }
794}
795
796#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
798pub enum OpenDiskFileBlobStore {
799 #[default]
801 Clear,
802 ReIndex,
804}
805
806#[cfg(test)]
807mod tests {
808 use alloy_consensus::BlobTransactionSidecar;
809 use alloy_eips::{
810 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
811 eip7594::{
812 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
813 },
814 };
815
816 use super::*;
817 use std::sync::atomic::Ordering;
818
819 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
820 let dir = tempfile::tempdir().unwrap();
821 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
822 (store, dir)
823 }
824
825 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
826 let mut rng = rand::rng();
827 (0..num)
828 .map(|_| {
829 let tx = TxHash::random_with(&mut rng);
830 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
831 blobs: vec![],
832 commitments: vec![],
833 proofs: vec![],
834 });
835 (tx, blob)
836 })
837 .collect()
838 }
839
840 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
841 let blob = Blob::default();
842 let commitment = Bytes48::default();
843 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
844
845 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
846
847 let expected =
848 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
849 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
850
851 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
852 }
853
854 #[test]
855 fn disk_insert_all_get_all() {
856 let (store, _dir) = tmp_store();
857
858 let blobs = rng_blobs(10);
859 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
860 store.insert_all(blobs.clone()).unwrap();
861
862 for (tx, blob) in &blobs {
864 assert!(store.is_cached(tx));
865 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
866 assert_eq!(b, *blob);
867 }
868
869 let all = store.get_all(all_hashes.clone()).unwrap();
870 for (tx, blob) in all {
871 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
872 }
873
874 assert!(store.contains(all_hashes[0]).unwrap());
875 store.delete_all(all_hashes.clone()).unwrap();
876 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
877 store.clear_cache();
878 store.cleanup();
879
880 assert!(store.get(blobs[0].0).unwrap().is_none());
881
882 let all = store.get_all(all_hashes.clone()).unwrap();
883 assert!(all.is_empty());
884
885 assert!(!store.contains(all_hashes[0]).unwrap());
886 assert!(store.get_exact(all_hashes).is_err());
887
888 assert_eq!(store.data_size_hint(), Some(0));
889 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
890 }
891
892 #[test]
893 fn disk_insert_and_retrieve() {
894 let (store, _dir) = tmp_store();
895
896 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
897 store.insert(tx, blob.clone()).unwrap();
898
899 assert!(store.is_cached(&tx));
900 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
901 assert_eq!(retrieved_blob, blob);
902 }
903
904 #[test]
905 fn disk_delete_blob() {
906 let (store, _dir) = tmp_store();
907
908 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
909 store.insert(tx, blob).unwrap();
910 assert!(store.is_cached(&tx));
911
912 store.delete(tx).unwrap();
913 assert!(store.inner.txs_to_delete.read().contains(&tx));
914 store.cleanup();
915
916 let result = store.get(tx).unwrap();
917 assert_eq!(
918 result,
919 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
920 blobs: vec![],
921 commitments: vec![],
922 proofs: vec![]
923 })))
924 );
925 }
926
927 #[test]
928 fn disk_insert_all_and_delete_all() {
929 let (store, _dir) = tmp_store();
930
931 let blobs = rng_blobs(5);
932 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
933 store.insert_all(blobs.clone()).unwrap();
934
935 for (tx, _) in &blobs {
936 assert!(store.is_cached(tx));
937 }
938
939 store.delete_all(txs.clone()).unwrap();
940 store.cleanup();
941
942 for tx in txs {
943 let result = store.get(tx).unwrap();
944 assert_eq!(
945 result,
946 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
947 blobs: vec![],
948 commitments: vec![],
949 proofs: vec![]
950 })))
951 );
952 }
953 }
954
955 #[test]
956 fn disk_get_all_blobs() {
957 let (store, _dir) = tmp_store();
958
959 let blobs = rng_blobs(3);
960 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
961 store.insert_all(blobs.clone()).unwrap();
962
963 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
964 for (tx, blob) in retrieved_blobs {
965 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
966 }
967
968 store.delete_all(txs).unwrap();
969 store.cleanup();
970 }
971
972 #[test]
973 fn disk_get_exact_blobs_success() {
974 let (store, _dir) = tmp_store();
975
976 let blobs = rng_blobs(3);
977 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
978 store.insert_all(blobs.clone()).unwrap();
979
980 let retrieved_blobs = store.get_exact(txs).unwrap();
981 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
982 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
983 }
984 }
985
986 #[test]
987 fn disk_get_exact_blobs_failure() {
988 let (store, _dir) = tmp_store();
989
990 let blobs = rng_blobs(2);
991 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
992 store.insert_all(blobs).unwrap();
993
994 let missing_tx = TxHash::random();
996 let result = store.get_exact(vec![txs[0], missing_tx]);
997 assert!(result.is_err());
998 }
999
1000 #[test]
1001 fn disk_data_size_hint() {
1002 let (store, _dir) = tmp_store();
1003 assert_eq!(store.data_size_hint(), Some(0));
1004
1005 let blobs = rng_blobs(2);
1006 store.insert_all(blobs).unwrap();
1007 assert!(store.data_size_hint().unwrap() > 0);
1008 }
1009
1010 #[test]
1011 fn disk_cleanup_stat() {
1012 let (store, _dir) = tmp_store();
1013
1014 let blobs = rng_blobs(3);
1015 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
1016 store.insert_all(blobs).unwrap();
1017
1018 store.delete_all(txs).unwrap();
1019 let stat = store.cleanup();
1020 assert_eq!(stat.delete_succeed, 3);
1021 assert_eq!(stat.delete_failed, 0);
1022 }
1023
1024 #[test]
1025 fn disk_get_blobs_v3_returns_partial_results() {
1026 let (store, _dir) = tmp_store();
1027
1028 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1029 store.insert(TxHash::random(), sidecar).unwrap();
1030
1031 assert_ne!(versioned_hash, B256::ZERO);
1032
1033 let request = vec![versioned_hash, B256::ZERO];
1034 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
1035 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
1036
1037 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
1038 assert_eq!(v3, vec![Some(expected), None]);
1039 }
1040
1041 #[test]
1042 fn disk_get_blobs_v4_returns_requested_cells() {
1043 let (store, _dir) = tmp_store();
1044
1045 let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1046 store.insert(TxHash::random(), sidecar).unwrap();
1047
1048 let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
1049 let request = vec![versioned_hash, B256::ZERO];
1050
1051 let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
1052 assert_eq!(v4.len(), request.len());
1053 assert!(v4[1].is_none());
1054
1055 let cells_and_proofs = v4[0].as_ref().unwrap();
1056 assert_eq!(cells_and_proofs.blob_cells.len(), 2);
1057 assert_eq!(cells_and_proofs.proofs.len(), 2);
1058 assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
1059 assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
1060 }
1061
1062 #[test]
1063 fn disk_get_blobs_v3_can_fallback_to_disk() {
1064 let (store, _dir) = tmp_store();
1065
1066 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1067 store.insert(TxHash::random(), sidecar).unwrap();
1068 store.clear_cache();
1069
1070 let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
1071 assert_eq!(v3, vec![Some(expected)]);
1072 }
1073
1074 #[test]
1075 fn disk_get_blobs_v4_can_fallback_to_disk() {
1076 let (store, _dir) = tmp_store();
1077
1078 let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1079 store.insert(TxHash::random(), sidecar).unwrap();
1080 store.clear_cache();
1081
1082 let v4 = store.get_by_versioned_hashes_v4(&[versioned_hash], B128::from(1u128)).unwrap();
1083 let cells_and_proofs = v4[0].as_ref().unwrap();
1084 assert_eq!(cells_and_proofs.blob_cells.len(), 1);
1085 assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default())]);
1086 }
1087
1088 #[test]
1089 fn disk_double_cleanup_no_failure() {
1090 let (store, _dir) = tmp_store();
1091
1092 let blobs = rng_blobs(5);
1093 let all_hashes: Vec<_> = blobs.iter().map(|(tx, _)| *tx).collect();
1094 store.insert_all(blobs).unwrap();
1095 store.clear_cache();
1096
1097 store.delete_all(all_hashes.clone()).unwrap();
1099
1100 let stat1 = store.cleanup();
1102 assert_eq!(stat1.delete_succeed, 5);
1103 assert_eq!(stat1.delete_failed, 0);
1104
1105 store.inner.txs_to_delete.write().extend(all_hashes);
1107
1108 let stat2 = store.cleanup();
1110 assert_eq!(stat2.delete_succeed, 5);
1111 assert_eq!(stat2.delete_failed, 0);
1112 }
1113}