1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
6 eip7594::{BlobCellMask, BlobTransactionSidecarVariant, Cell},
7 eip7840::BlobParams,
8 merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B128, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24 BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33 inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37 pub fn open(
39 blob_dir: impl Into<PathBuf>,
40 opts: DiskFileBlobStoreConfig,
41 ) -> Result<Self, DiskFileBlobStoreError> {
42 let blob_dir = blob_dir.into();
43 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46 inner.delete_all()?;
48 inner.create_blob_dir()?;
49
50 Ok(Self { inner: Arc::new(inner) })
51 }
52
53 #[cfg(test)]
54 fn is_cached(&self, tx: &B256) -> bool {
55 self.inner.blob_cache.lock().get(tx).is_some()
56 }
57
58 #[cfg(test)]
59 fn clear_cache(&self) {
60 self.inner.blob_cache.lock().clear()
61 }
62
63 fn get_by_versioned_hashes_eip7594(
72 &self,
73 versioned_hashes: &[B256],
74 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75 let mut result = vec![None; versioned_hashes.len()];
78 let mut missing_count = result.len();
79 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82 for (hash_idx, match_result) in
83 blob_sidecar.match_versioned_hashes(versioned_hashes)
84 {
85 let slot = &mut result[hash_idx];
86 if slot.is_none() {
87 missing_count -= 1;
88 }
89 *slot = Some(match_result);
90 }
91 }
92
93 if missing_count == 0 {
95 if result.iter().all(|blob| blob.is_some()) {
97 return Ok(result);
98 }
99 }
100 }
101
102 let mut missing_tx_hashes = Vec::new();
104
105 {
106 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107 for (idx, _) in
108 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109 {
110 let versioned_hash = versioned_hashes[idx];
112 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113 missing_tx_hashes.push(tx_hash);
114 }
115 }
116 }
117
118 if !missing_tx_hashes.is_empty() {
120 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121 for (_, blob_sidecar) in blobs_from_disk {
122 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123 for (hash_idx, match_result) in
124 blob_sidecar.match_versioned_hashes(versioned_hashes)
125 {
126 if result[hash_idx].is_none() {
127 result[hash_idx] = Some(match_result);
128 }
129 }
130 }
131 }
132 }
133
134 Ok(result)
135 }
136
137 fn get_by_versioned_hashes_cells_eip7594(
139 &self,
140 versioned_hashes: &[B256],
141 indices_bitarray: B128,
142 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
143 let cell_mask = BlobCellMask::new(indices_bitarray);
144 let mut result = vec![None; versioned_hashes.len()];
145 let mut missing_count = result.len();
146
147 let cached_blob_sidecars = self
148 .inner
149 .blob_cache
150 .lock()
151 .iter()
152 .map(|(_, blob_sidecar)| Arc::clone(blob_sidecar))
153 .collect::<Vec<_>>();
154 for blob_sidecar in cached_blob_sidecars {
155 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
156 for (hash_idx, match_result) in blob_sidecar
157 .match_versioned_hashes_cells(versioned_hashes, cell_mask)
158 .map_err(|err| BlobStoreError::Other(Box::new(err)))?
159 {
160 let slot = &mut result[hash_idx];
161 if slot.is_none() {
162 missing_count -= 1;
163 }
164 *slot = Some(match_result);
165 }
166 }
167
168 if missing_count == 0 && result.iter().all(Option::is_some) {
169 return Ok(result)
170 }
171 }
172
173 let mut missing_tx_hashes = Vec::new();
174 {
175 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
176 for (idx, _) in
177 result.iter().enumerate().filter(|(_, cells_and_proofs)| cells_and_proofs.is_none())
178 {
179 let versioned_hash = versioned_hashes[idx];
180 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
181 missing_tx_hashes.push(tx_hash);
182 }
183 }
184 }
185
186 if !missing_tx_hashes.is_empty() {
187 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
188 for (_, blob_sidecar) in blobs_from_disk {
189 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
190 for (hash_idx, match_result) in blob_sidecar
191 .match_versioned_hashes_cells(versioned_hashes, cell_mask)
192 .map_err(|err| BlobStoreError::Other(Box::new(err)))?
193 {
194 if result[hash_idx].is_none() {
195 result[hash_idx] = Some(match_result);
196 }
197 }
198 }
199 }
200 }
201
202 Ok(result)
203 }
204}
205
206impl BlobStore for DiskFileBlobStore {
207 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
208 self.inner.insert_one(tx, data)
209 }
210
211 fn insert_all(
212 &self,
213 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
214 ) -> Result<(), BlobStoreError> {
215 if txs.is_empty() {
216 return Ok(())
217 }
218 self.inner.insert_many(txs)
219 }
220
221 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
222 if self.inner.contains(tx)? {
223 self.inner.txs_to_delete.write().insert(tx);
224 }
225 Ok(())
226 }
227
228 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
229 if txs.is_empty() {
230 return Ok(())
231 }
232 let txs = self.inner.retain_existing(txs)?;
233 self.inner.txs_to_delete.write().extend(txs);
234 Ok(())
235 }
236
237 fn cleanup(&self) -> BlobStoreCleanupStat {
238 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
239 let mut stat = BlobStoreCleanupStat::default();
240 let mut subsize = 0;
241 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
242 for tx in txs_to_delete {
243 let path = self.inner.blob_disk_file(tx);
244 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
245 match fs::remove_file(&path) {
246 Ok(_) => {
247 stat.delete_succeed += 1;
248 subsize += filesize;
249 }
250 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
251 stat.delete_succeed += 1;
253 }
254 Err(e) => {
255 stat.delete_failed += 1;
256 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
257 debug!(target:"txpool::blob", %err);
258 }
259 };
260 }
261 self.inner.size_tracker.sub_size(subsize as usize);
262 self.inner.size_tracker.sub_len(stat.delete_succeed);
263 stat
264 }
265
266 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
267 self.inner.get_one(tx)
268 }
269
270 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
271 self.inner.contains(tx)
272 }
273
274 fn get_all(
275 &self,
276 txs: Vec<B256>,
277 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
278 if txs.is_empty() {
279 return Ok(Vec::new())
280 }
281 self.inner.get_all(txs)
282 }
283
284 fn get_exact(
285 &self,
286 txs: Vec<B256>,
287 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
288 if txs.is_empty() {
289 return Ok(Vec::new())
290 }
291 self.inner.get_exact(txs)
292 }
293
294 fn get_by_versioned_hashes_v1(
295 &self,
296 versioned_hashes: &[B256],
297 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
298 let mut result = vec![None; versioned_hashes.len()];
300
301 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
303 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
304 for (hash_idx, match_result) in
305 blob_sidecar.match_versioned_hashes(versioned_hashes)
306 {
307 result[hash_idx] = Some(match_result);
308 }
309 }
310
311 if result.iter().all(|blob| blob.is_some()) {
313 return Ok(result);
314 }
315 }
316
317 let mut missing_tx_hashes = Vec::new();
320
321 {
322 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
323 for (idx, _) in
324 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
325 {
326 let versioned_hash = versioned_hashes[idx];
328 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
329 missing_tx_hashes.push(tx_hash);
330 }
331 }
332 }
333
334 if !missing_tx_hashes.is_empty() {
336 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
337 for (_, blob_sidecar) in blobs_from_disk {
338 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
339 for (hash_idx, match_result) in
340 blob_sidecar.match_versioned_hashes(versioned_hashes)
341 {
342 if result[hash_idx].is_none() {
343 result[hash_idx] = Some(match_result);
344 }
345 }
346 }
347 }
348 }
349
350 Ok(result)
351 }
352
353 fn get_by_versioned_hashes_v2(
354 &self,
355 versioned_hashes: &[B256],
356 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
357 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
358
359 if result.iter().all(|blob| blob.is_some()) {
361 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
362 } else {
363 Ok(None)
364 }
365 }
366
367 fn get_by_versioned_hashes_v3(
368 &self,
369 versioned_hashes: &[B256],
370 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
371 self.get_by_versioned_hashes_eip7594(versioned_hashes)
372 }
373
374 fn get_by_versioned_hashes_v4(
375 &self,
376 versioned_hashes: &[B256],
377 indices_bitarray: B128,
378 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
379 self.get_by_versioned_hashes_cells_eip7594(versioned_hashes, indices_bitarray)
380 }
381
382 fn get_cells(
383 &self,
384 tx: B256,
385 indices_bitarray: B128,
386 ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
387 let Some(sidecar) = self.get(tx)? else {
388 return Ok(None);
389 };
390
391 let Some(sidecar) = sidecar.as_eip7594() else {
392 return Ok(None);
393 };
394
395 sidecar
396 .compute_matching_cells(BlobCellMask::new(indices_bitarray))
397 .map(Some)
398 .map_err(|err| BlobStoreError::Other(Box::new(err)))
399 }
400
401 fn data_size_hint(&self) -> Option<usize> {
402 Some(self.inner.size_tracker.data_size())
403 }
404
405 fn blobs_len(&self) -> usize {
406 self.inner.size_tracker.blobs_len()
407 }
408}
409
410struct DiskFileBlobStoreInner {
411 blob_dir: PathBuf,
412 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
413 size_tracker: BlobStoreSize,
414 file_lock: RwLock<()>,
415 txs_to_delete: RwLock<B256Set>,
416 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
421}
422
423impl DiskFileBlobStoreInner {
424 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
426 Self {
427 blob_dir,
428 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
429 size_tracker: Default::default(),
430 file_lock: Default::default(),
431 txs_to_delete: Default::default(),
432 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
433 VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
434 ))),
435 }
436 }
437
438 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
440 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
441 fs::create_dir_all(&self.blob_dir)
442 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
443 }
444
445 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
447 match fs::remove_dir_all(&self.blob_dir) {
448 Ok(_) => {
449 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
450 }
451 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
452 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
453 }
454 Ok(())
455 }
456
457 fn insert_one(
459 &self,
460 tx: B256,
461 data: BlobTransactionSidecarVariant,
462 ) -> Result<(), BlobStoreError> {
463 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
464 data.rlp_encode_fields(&mut buf);
465
466 {
467 let mut map = self.versioned_hashes_to_txhash.lock();
469 data.versioned_hashes().for_each(|hash| {
470 map.insert(hash, tx);
471 });
472 }
473
474 self.blob_cache.lock().insert(tx, Arc::new(data));
475
476 let size = self.write_one_encoded(tx, &buf)?;
477
478 self.size_tracker.add_size(size);
479 self.size_tracker.inc_len(1);
480 Ok(())
481 }
482
483 fn insert_many(
485 &self,
486 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
487 ) -> Result<(), BlobStoreError> {
488 let raw = txs
489 .iter()
490 .map(|(tx, data)| {
491 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
492 data.rlp_encode_fields(&mut buf);
493 (self.blob_disk_file(*tx), buf)
494 })
495 .collect::<Vec<_>>();
496
497 {
498 let mut map = self.versioned_hashes_to_txhash.lock();
500 for (tx, data) in &txs {
501 data.versioned_hashes().for_each(|hash| {
502 map.insert(hash, *tx);
503 });
504 }
505 }
506
507 {
508 let mut cache = self.blob_cache.lock();
510 for (tx, data) in txs {
511 cache.insert(tx, Arc::new(data));
512 }
513 }
514
515 let mut add = 0;
516 let mut num = 0;
517 {
518 let _lock = self.file_lock.write();
519 for (path, data) in raw {
520 if path.exists() {
521 debug!(target:"txpool::blob", ?path, "Blob already exists");
522 } else if let Err(err) = fs::write(&path, &data) {
523 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
524 } else {
525 add += data.len();
526 num += 1;
527 }
528 }
529 }
530 self.size_tracker.add_size(add);
531 self.size_tracker.inc_len(num);
532
533 Ok(())
534 }
535
536 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
538 if self.blob_cache.lock().get(&tx).is_some() {
539 return Ok(true)
540 }
541 Ok(self.blob_disk_file(tx).is_file())
543 }
544
545 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
547 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
548 let mut cache = self.blob_cache.lock();
549 txs.into_iter().partition(|tx| cache.get(tx).is_some())
550 };
551
552 let mut existing = in_cache;
553 for tx in not_in_cache {
554 if self.blob_disk_file(tx).is_file() {
555 existing.push(tx);
556 }
557 }
558
559 Ok(existing)
560 }
561
562 fn get_one(
564 &self,
565 tx: B256,
566 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
567 if let Some(blob) = self.blob_cache.lock().get(&tx) {
568 return Ok(Some(blob.clone()))
569 }
570
571 if let Some(blob) = self.read_one(tx)? {
572 let blob_arc = Arc::new(blob);
573 self.blob_cache.lock().insert(tx, blob_arc.clone());
574 return Ok(Some(blob_arc))
575 }
576
577 Ok(None)
578 }
579
580 #[inline]
582 fn blob_disk_file(&self, tx: B256) -> PathBuf {
583 self.blob_dir.join(format!("{tx:x}"))
584 }
585
586 #[inline]
588 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
589 let path = self.blob_disk_file(tx);
590 let data = {
591 let _lock = self.file_lock.read();
592 match fs::read(&path) {
593 Ok(data) => data,
594 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
595 Err(e) => {
596 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
597 tx, path, e,
598 ))))
599 }
600 }
601 };
602 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
603 .map(Some)
604 .map_err(BlobStoreError::DecodeError)
605 }
606
607 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
611 self.read_many_raw(txs)
612 .into_iter()
613 .filter_map(|(tx, data)| {
614 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
615 .map(|sidecar| (tx, sidecar))
616 .ok()
617 })
618 .collect()
619 }
620
621 #[inline]
625 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
626 let mut res = Vec::with_capacity(txs.len());
627 let _lock = self.file_lock.read();
628 for tx in txs {
629 let path = self.blob_disk_file(tx);
630 match fs::read(&path) {
631 Ok(data) => {
632 res.push((tx, data));
633 }
634 Err(err) => {
635 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
636 }
637 };
638 }
639 res
640 }
641
642 #[inline]
644 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
645 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
646 let mut add = 0;
647 let path = self.blob_disk_file(tx);
648 {
649 let _lock = self.file_lock.write();
650 if !path.exists() {
651 fs::write(&path, data)
652 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
653 add = data.len();
654 }
655 }
656 Ok(add)
657 }
658
659 #[inline]
664 fn get_all(
665 &self,
666 txs: Vec<B256>,
667 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
668 let mut res = Vec::with_capacity(txs.len());
669 let mut cache_miss = Vec::new();
670 {
671 let mut cache = self.blob_cache.lock();
672 for tx in txs {
673 if let Some(blob) = cache.get(&tx) {
674 res.push((tx, blob.clone()));
675 } else {
676 cache_miss.push(tx)
677 }
678 }
679 }
680 if cache_miss.is_empty() {
681 return Ok(res)
682 }
683 let from_disk = self.read_many_decoded(cache_miss);
684 if from_disk.is_empty() {
685 return Ok(res)
686 }
687 let from_disk = from_disk
688 .into_iter()
689 .map(|(tx, data)| {
690 let data = Arc::new(data);
691 res.push((tx, data.clone()));
692 (tx, data)
693 })
694 .collect::<Vec<_>>();
695
696 let mut cache = self.blob_cache.lock();
697 for (tx, data) in from_disk {
698 cache.insert(tx, data);
699 }
700
701 Ok(res)
702 }
703
704 #[inline]
708 fn get_exact(
709 &self,
710 txs: Vec<B256>,
711 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
712 txs.into_iter()
713 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
714 .collect()
715 }
716}
717
718impl fmt::Debug for DiskFileBlobStoreInner {
719 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720 f.debug_struct("DiskFileBlobStoreInner")
721 .field("blob_dir", &self.blob_dir)
722 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
723 .field("txs_to_delete", &self.txs_to_delete.try_read())
724 .finish()
725 }
726}
727
728#[derive(Debug, thiserror::Error)]
730pub enum DiskFileBlobStoreError {
731 #[error("failed to open blobstore at {0}: {1}")]
733 Open(PathBuf, io::Error),
735 #[error("[{0}] failed to read blob file at {1}: {2}")]
737 ReadFile(TxHash, PathBuf, io::Error),
739 #[error("[{0}] failed to write blob file at {1}: {2}")]
741 WriteFile(TxHash, PathBuf, io::Error),
743 #[error("[{0}] failed to delete blob file at {1}: {2}")]
745 DeleteFile(TxHash, PathBuf, io::Error),
747}
748
749impl From<DiskFileBlobStoreError> for BlobStoreError {
750 fn from(value: DiskFileBlobStoreError) -> Self {
751 Self::Other(Box::new(value))
752 }
753}
754
755#[derive(Debug, Clone)]
757pub struct DiskFileBlobStoreConfig {
758 pub max_cached_entries: u32,
760 pub open: OpenDiskFileBlobStore,
762}
763
764impl Default for DiskFileBlobStoreConfig {
765 fn default() -> Self {
766 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
767 }
768}
769
770impl DiskFileBlobStoreConfig {
771 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
773 self.max_cached_entries = max_cached_entries;
774 self
775 }
776}
777
778#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
780pub enum OpenDiskFileBlobStore {
781 #[default]
783 Clear,
784 ReIndex,
786}
787
788#[cfg(test)]
789mod tests {
790 use alloy_consensus::BlobTransactionSidecar;
791 use alloy_eips::{
792 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
793 eip7594::{
794 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
795 },
796 };
797
798 use super::*;
799 use std::sync::atomic::Ordering;
800
801 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
802 let dir = tempfile::tempdir().unwrap();
803 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
804 (store, dir)
805 }
806
807 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
808 let mut rng = rand::rng();
809 (0..num)
810 .map(|_| {
811 let tx = TxHash::random_with(&mut rng);
812 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
813 blobs: vec![],
814 commitments: vec![],
815 proofs: vec![],
816 });
817 (tx, blob)
818 })
819 .collect()
820 }
821
822 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
823 let blob = Blob::default();
824 let commitment = Bytes48::default();
825 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
826
827 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
828
829 let expected =
830 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
831 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
832
833 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
834 }
835
836 #[test]
837 fn disk_insert_all_get_all() {
838 let (store, _dir) = tmp_store();
839
840 let blobs = rng_blobs(10);
841 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
842 store.insert_all(blobs.clone()).unwrap();
843
844 for (tx, blob) in &blobs {
846 assert!(store.is_cached(tx));
847 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
848 assert_eq!(b, *blob);
849 }
850
851 let all = store.get_all(all_hashes.clone()).unwrap();
852 for (tx, blob) in all {
853 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
854 }
855
856 assert!(store.contains(all_hashes[0]).unwrap());
857 store.delete_all(all_hashes.clone()).unwrap();
858 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
859 store.clear_cache();
860 store.cleanup();
861
862 assert!(store.get(blobs[0].0).unwrap().is_none());
863
864 let all = store.get_all(all_hashes.clone()).unwrap();
865 assert!(all.is_empty());
866
867 assert!(!store.contains(all_hashes[0]).unwrap());
868 assert!(store.get_exact(all_hashes).is_err());
869
870 assert_eq!(store.data_size_hint(), Some(0));
871 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
872 }
873
874 #[test]
875 fn disk_insert_and_retrieve() {
876 let (store, _dir) = tmp_store();
877
878 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
879 store.insert(tx, blob.clone()).unwrap();
880
881 assert!(store.is_cached(&tx));
882 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
883 assert_eq!(retrieved_blob, blob);
884 }
885
886 #[test]
887 fn disk_delete_blob() {
888 let (store, _dir) = tmp_store();
889
890 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
891 store.insert(tx, blob).unwrap();
892 assert!(store.is_cached(&tx));
893
894 store.delete(tx).unwrap();
895 assert!(store.inner.txs_to_delete.read().contains(&tx));
896 store.cleanup();
897
898 let result = store.get(tx).unwrap();
899 assert_eq!(
900 result,
901 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
902 blobs: vec![],
903 commitments: vec![],
904 proofs: vec![]
905 })))
906 );
907 }
908
909 #[test]
910 fn disk_insert_all_and_delete_all() {
911 let (store, _dir) = tmp_store();
912
913 let blobs = rng_blobs(5);
914 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
915 store.insert_all(blobs.clone()).unwrap();
916
917 for (tx, _) in &blobs {
918 assert!(store.is_cached(tx));
919 }
920
921 store.delete_all(txs.clone()).unwrap();
922 store.cleanup();
923
924 for tx in txs {
925 let result = store.get(tx).unwrap();
926 assert_eq!(
927 result,
928 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
929 blobs: vec![],
930 commitments: vec![],
931 proofs: vec![]
932 })))
933 );
934 }
935 }
936
937 #[test]
938 fn disk_get_all_blobs() {
939 let (store, _dir) = tmp_store();
940
941 let blobs = rng_blobs(3);
942 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
943 store.insert_all(blobs.clone()).unwrap();
944
945 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
946 for (tx, blob) in retrieved_blobs {
947 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
948 }
949
950 store.delete_all(txs).unwrap();
951 store.cleanup();
952 }
953
954 #[test]
955 fn disk_get_exact_blobs_success() {
956 let (store, _dir) = tmp_store();
957
958 let blobs = rng_blobs(3);
959 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
960 store.insert_all(blobs.clone()).unwrap();
961
962 let retrieved_blobs = store.get_exact(txs).unwrap();
963 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
964 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
965 }
966 }
967
968 #[test]
969 fn disk_get_exact_blobs_failure() {
970 let (store, _dir) = tmp_store();
971
972 let blobs = rng_blobs(2);
973 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
974 store.insert_all(blobs).unwrap();
975
976 let missing_tx = TxHash::random();
978 let result = store.get_exact(vec![txs[0], missing_tx]);
979 assert!(result.is_err());
980 }
981
982 #[test]
983 fn disk_data_size_hint() {
984 let (store, _dir) = tmp_store();
985 assert_eq!(store.data_size_hint(), Some(0));
986
987 let blobs = rng_blobs(2);
988 store.insert_all(blobs).unwrap();
989 assert!(store.data_size_hint().unwrap() > 0);
990 }
991
992 #[test]
993 fn disk_cleanup_stat() {
994 let (store, _dir) = tmp_store();
995
996 let blobs = rng_blobs(3);
997 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
998 store.insert_all(blobs).unwrap();
999
1000 store.delete_all(txs).unwrap();
1001 let stat = store.cleanup();
1002 assert_eq!(stat.delete_succeed, 3);
1003 assert_eq!(stat.delete_failed, 0);
1004 }
1005
1006 #[test]
1007 fn disk_get_blobs_v3_returns_partial_results() {
1008 let (store, _dir) = tmp_store();
1009
1010 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1011 store.insert(TxHash::random(), sidecar).unwrap();
1012
1013 assert_ne!(versioned_hash, B256::ZERO);
1014
1015 let request = vec![versioned_hash, B256::ZERO];
1016 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
1017 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
1018
1019 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
1020 assert_eq!(v3, vec![Some(expected), None]);
1021 }
1022
1023 #[test]
1024 fn disk_get_blobs_v4_returns_requested_cells() {
1025 let (store, _dir) = tmp_store();
1026
1027 let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1028 store.insert(TxHash::random(), sidecar).unwrap();
1029
1030 let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
1031 let request = vec![versioned_hash, B256::ZERO];
1032
1033 let v4 = store.get_by_versioned_hashes_v4(&request, indices_bitarray).unwrap();
1034 assert_eq!(v4.len(), request.len());
1035 assert!(v4[1].is_none());
1036
1037 let cells_and_proofs = v4[0].as_ref().unwrap();
1038 assert_eq!(cells_and_proofs.blob_cells.len(), 2);
1039 assert_eq!(cells_and_proofs.proofs.len(), 2);
1040 assert!(cells_and_proofs.blob_cells.iter().all(Option::is_some));
1041 assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default()); 2]);
1042 }
1043
1044 #[test]
1045 fn disk_get_blobs_v3_can_fallback_to_disk() {
1046 let (store, _dir) = tmp_store();
1047
1048 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
1049 store.insert(TxHash::random(), sidecar).unwrap();
1050 store.clear_cache();
1051
1052 let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
1053 assert_eq!(v3, vec![Some(expected)]);
1054 }
1055
1056 #[test]
1057 fn disk_get_blobs_v4_can_fallback_to_disk() {
1058 let (store, _dir) = tmp_store();
1059
1060 let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1061 store.insert(TxHash::random(), sidecar).unwrap();
1062 store.clear_cache();
1063
1064 let v4 = store.get_by_versioned_hashes_v4(&[versioned_hash], B128::from(1u128)).unwrap();
1065 let cells_and_proofs = v4[0].as_ref().unwrap();
1066 assert_eq!(cells_and_proofs.blob_cells.len(), 1);
1067 assert_eq!(cells_and_proofs.proofs, vec![Some(Bytes48::default())]);
1068 }
1069
1070 #[test]
1071 fn disk_get_cells_can_fallback_to_disk() {
1072 let (store, _dir) = tmp_store();
1073
1074 let tx_hash = TxHash::random();
1075 let (sidecar, versioned_hash, _) = eip7594_single_blob_sidecar();
1076 store.insert(tx_hash, sidecar).unwrap();
1077
1078 let indices_bitarray = B128::from((1u128 << 0) | (1u128 << 7));
1079 let expected = store
1080 .get_by_versioned_hashes_v4(&[versioned_hash], indices_bitarray)
1081 .unwrap()
1082 .pop()
1083 .unwrap()
1084 .unwrap()
1085 .blob_cells
1086 .into_iter()
1087 .collect::<Option<Vec<_>>>()
1088 .unwrap();
1089
1090 store.clear_cache();
1091
1092 assert_eq!(store.get_cells(tx_hash, indices_bitarray).unwrap(), Some(expected));
1093 }
1094
1095 #[test]
1096 fn disk_double_cleanup_no_failure() {
1097 let (store, _dir) = tmp_store();
1098
1099 let blobs = rng_blobs(5);
1100 let all_hashes: Vec<_> = blobs.iter().map(|(tx, _)| *tx).collect();
1101 store.insert_all(blobs).unwrap();
1102 store.clear_cache();
1103
1104 store.delete_all(all_hashes.clone()).unwrap();
1106
1107 let stat1 = store.cleanup();
1109 assert_eq!(stat1.delete_succeed, 5);
1110 assert_eq!(stat1.delete_failed, 0);
1111
1112 store.inner.txs_to_delete.write().extend(all_hashes);
1114
1115 let stat2 = store.cleanup();
1117 assert_eq!(stat2.delete_succeed, 5);
1118 assert_eq!(stat2.delete_failed, 0);
1119 }
1120}