1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2},
6 eip7594::BlobTransactionSidecarVariant,
7 eip7840::BlobParams,
8 merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{map::B256Set, TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24 BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33 inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37 pub fn open(
39 blob_dir: impl Into<PathBuf>,
40 opts: DiskFileBlobStoreConfig,
41 ) -> Result<Self, DiskFileBlobStoreError> {
42 let blob_dir = blob_dir.into();
43 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46 inner.delete_all()?;
48 inner.create_blob_dir()?;
49
50 Ok(Self { inner: Arc::new(inner) })
51 }
52
53 #[cfg(test)]
54 fn is_cached(&self, tx: &B256) -> bool {
55 self.inner.blob_cache.lock().get(tx).is_some()
56 }
57
58 #[cfg(test)]
59 fn clear_cache(&self) {
60 self.inner.blob_cache.lock().clear()
61 }
62
63 fn get_by_versioned_hashes_eip7594(
72 &self,
73 versioned_hashes: &[B256],
74 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75 let mut result = vec![None; versioned_hashes.len()];
78 let mut missing_count = result.len();
79 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82 for (hash_idx, match_result) in
83 blob_sidecar.match_versioned_hashes(versioned_hashes)
84 {
85 let slot = &mut result[hash_idx];
86 if slot.is_none() {
87 missing_count -= 1;
88 }
89 *slot = Some(match_result);
90 }
91 }
92
93 if missing_count == 0 {
95 if result.iter().all(|blob| blob.is_some()) {
97 return Ok(result);
98 }
99 }
100 }
101
102 let mut missing_tx_hashes = Vec::new();
104
105 {
106 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
107 for (idx, _) in
108 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
109 {
110 let versioned_hash = versioned_hashes[idx];
112 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
113 missing_tx_hashes.push(tx_hash);
114 }
115 }
116 }
117
118 if !missing_tx_hashes.is_empty() {
120 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
121 for (_, blob_sidecar) in blobs_from_disk {
122 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
123 for (hash_idx, match_result) in
124 blob_sidecar.match_versioned_hashes(versioned_hashes)
125 {
126 if result[hash_idx].is_none() {
127 result[hash_idx] = Some(match_result);
128 }
129 }
130 }
131 }
132 }
133
134 Ok(result)
135 }
136}
137
138impl BlobStore for DiskFileBlobStore {
139 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
140 self.inner.insert_one(tx, data)
141 }
142
143 fn insert_all(
144 &self,
145 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
146 ) -> Result<(), BlobStoreError> {
147 if txs.is_empty() {
148 return Ok(())
149 }
150 self.inner.insert_many(txs)
151 }
152
153 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
154 if self.inner.contains(tx)? {
155 self.inner.txs_to_delete.write().insert(tx);
156 }
157 Ok(())
158 }
159
160 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
161 if txs.is_empty() {
162 return Ok(())
163 }
164 let txs = self.inner.retain_existing(txs)?;
165 self.inner.txs_to_delete.write().extend(txs);
166 Ok(())
167 }
168
169 fn cleanup(&self) -> BlobStoreCleanupStat {
170 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
171 let mut stat = BlobStoreCleanupStat::default();
172 let mut subsize = 0;
173 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
174 for tx in txs_to_delete {
175 let path = self.inner.blob_disk_file(tx);
176 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
177 match fs::remove_file(&path) {
178 Ok(_) => {
179 stat.delete_succeed += 1;
180 subsize += filesize;
181 }
182 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
183 stat.delete_succeed += 1;
185 }
186 Err(e) => {
187 stat.delete_failed += 1;
188 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
189 debug!(target:"txpool::blob", %err);
190 }
191 };
192 }
193 self.inner.size_tracker.sub_size(subsize as usize);
194 self.inner.size_tracker.sub_len(stat.delete_succeed);
195 stat
196 }
197
198 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
199 self.inner.get_one(tx)
200 }
201
202 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
203 self.inner.contains(tx)
204 }
205
206 fn get_all(
207 &self,
208 txs: Vec<B256>,
209 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
210 if txs.is_empty() {
211 return Ok(Vec::new())
212 }
213 self.inner.get_all(txs)
214 }
215
216 fn get_exact(
217 &self,
218 txs: Vec<B256>,
219 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
220 if txs.is_empty() {
221 return Ok(Vec::new())
222 }
223 self.inner.get_exact(txs)
224 }
225
226 fn get_by_versioned_hashes_v1(
227 &self,
228 versioned_hashes: &[B256],
229 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
230 let mut result = vec![None; versioned_hashes.len()];
232
233 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
235 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
236 for (hash_idx, match_result) in
237 blob_sidecar.match_versioned_hashes(versioned_hashes)
238 {
239 result[hash_idx] = Some(match_result);
240 }
241 }
242
243 if result.iter().all(|blob| blob.is_some()) {
245 return Ok(result);
246 }
247 }
248
249 let mut missing_tx_hashes = Vec::new();
252
253 {
254 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
255 for (idx, _) in
256 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
257 {
258 let versioned_hash = versioned_hashes[idx];
260 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
261 missing_tx_hashes.push(tx_hash);
262 }
263 }
264 }
265
266 if !missing_tx_hashes.is_empty() {
268 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
269 for (_, blob_sidecar) in blobs_from_disk {
270 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
271 for (hash_idx, match_result) in
272 blob_sidecar.match_versioned_hashes(versioned_hashes)
273 {
274 if result[hash_idx].is_none() {
275 result[hash_idx] = Some(match_result);
276 }
277 }
278 }
279 }
280 }
281
282 Ok(result)
283 }
284
285 fn get_by_versioned_hashes_v2(
286 &self,
287 versioned_hashes: &[B256],
288 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
289 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
290
291 if result.iter().all(|blob| blob.is_some()) {
293 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
294 } else {
295 Ok(None)
296 }
297 }
298
299 fn get_by_versioned_hashes_v3(
300 &self,
301 versioned_hashes: &[B256],
302 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
303 self.get_by_versioned_hashes_eip7594(versioned_hashes)
304 }
305
306 fn data_size_hint(&self) -> Option<usize> {
307 Some(self.inner.size_tracker.data_size())
308 }
309
310 fn blobs_len(&self) -> usize {
311 self.inner.size_tracker.blobs_len()
312 }
313}
314
315struct DiskFileBlobStoreInner {
316 blob_dir: PathBuf,
317 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
318 size_tracker: BlobStoreSize,
319 file_lock: RwLock<()>,
320 txs_to_delete: RwLock<B256Set>,
321 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
326}
327
328impl DiskFileBlobStoreInner {
329 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
331 Self {
332 blob_dir,
333 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
334 size_tracker: Default::default(),
335 file_lock: Default::default(),
336 txs_to_delete: Default::default(),
337 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
338 VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
339 ))),
340 }
341 }
342
343 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
345 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
346 fs::create_dir_all(&self.blob_dir)
347 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
348 }
349
350 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
352 match fs::remove_dir_all(&self.blob_dir) {
353 Ok(_) => {
354 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
355 }
356 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
357 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
358 }
359 Ok(())
360 }
361
362 fn insert_one(
364 &self,
365 tx: B256,
366 data: BlobTransactionSidecarVariant,
367 ) -> Result<(), BlobStoreError> {
368 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
369 data.rlp_encode_fields(&mut buf);
370
371 {
372 let mut map = self.versioned_hashes_to_txhash.lock();
374 data.versioned_hashes().for_each(|hash| {
375 map.insert(hash, tx);
376 });
377 }
378
379 self.blob_cache.lock().insert(tx, Arc::new(data));
380
381 let size = self.write_one_encoded(tx, &buf)?;
382
383 self.size_tracker.add_size(size);
384 self.size_tracker.inc_len(1);
385 Ok(())
386 }
387
388 fn insert_many(
390 &self,
391 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
392 ) -> Result<(), BlobStoreError> {
393 let raw = txs
394 .iter()
395 .map(|(tx, data)| {
396 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
397 data.rlp_encode_fields(&mut buf);
398 (self.blob_disk_file(*tx), buf)
399 })
400 .collect::<Vec<_>>();
401
402 {
403 let mut map = self.versioned_hashes_to_txhash.lock();
405 for (tx, data) in &txs {
406 data.versioned_hashes().for_each(|hash| {
407 map.insert(hash, *tx);
408 });
409 }
410 }
411
412 {
413 let mut cache = self.blob_cache.lock();
415 for (tx, data) in txs {
416 cache.insert(tx, Arc::new(data));
417 }
418 }
419
420 let mut add = 0;
421 let mut num = 0;
422 {
423 let _lock = self.file_lock.write();
424 for (path, data) in raw {
425 if path.exists() {
426 debug!(target:"txpool::blob", ?path, "Blob already exists");
427 } else if let Err(err) = fs::write(&path, &data) {
428 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
429 } else {
430 add += data.len();
431 num += 1;
432 }
433 }
434 }
435 self.size_tracker.add_size(add);
436 self.size_tracker.inc_len(num);
437
438 Ok(())
439 }
440
441 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
443 if self.blob_cache.lock().get(&tx).is_some() {
444 return Ok(true)
445 }
446 Ok(self.blob_disk_file(tx).is_file())
448 }
449
450 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
452 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
453 let mut cache = self.blob_cache.lock();
454 txs.into_iter().partition(|tx| cache.get(tx).is_some())
455 };
456
457 let mut existing = in_cache;
458 for tx in not_in_cache {
459 if self.blob_disk_file(tx).is_file() {
460 existing.push(tx);
461 }
462 }
463
464 Ok(existing)
465 }
466
467 fn get_one(
469 &self,
470 tx: B256,
471 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
472 if let Some(blob) = self.blob_cache.lock().get(&tx) {
473 return Ok(Some(blob.clone()))
474 }
475
476 if let Some(blob) = self.read_one(tx)? {
477 let blob_arc = Arc::new(blob);
478 self.blob_cache.lock().insert(tx, blob_arc.clone());
479 return Ok(Some(blob_arc))
480 }
481
482 Ok(None)
483 }
484
485 #[inline]
487 fn blob_disk_file(&self, tx: B256) -> PathBuf {
488 self.blob_dir.join(format!("{tx:x}"))
489 }
490
491 #[inline]
493 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
494 let path = self.blob_disk_file(tx);
495 let data = {
496 let _lock = self.file_lock.read();
497 match fs::read(&path) {
498 Ok(data) => data,
499 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
500 Err(e) => {
501 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
502 tx, path, e,
503 ))))
504 }
505 }
506 };
507 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
508 .map(Some)
509 .map_err(BlobStoreError::DecodeError)
510 }
511
512 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
516 self.read_many_raw(txs)
517 .into_iter()
518 .filter_map(|(tx, data)| {
519 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
520 .map(|sidecar| (tx, sidecar))
521 .ok()
522 })
523 .collect()
524 }
525
526 #[inline]
530 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
531 let mut res = Vec::with_capacity(txs.len());
532 let _lock = self.file_lock.read();
533 for tx in txs {
534 let path = self.blob_disk_file(tx);
535 match fs::read(&path) {
536 Ok(data) => {
537 res.push((tx, data));
538 }
539 Err(err) => {
540 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
541 }
542 };
543 }
544 res
545 }
546
547 #[inline]
549 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
550 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
551 let mut add = 0;
552 let path = self.blob_disk_file(tx);
553 {
554 let _lock = self.file_lock.write();
555 if !path.exists() {
556 fs::write(&path, data)
557 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
558 add = data.len();
559 }
560 }
561 Ok(add)
562 }
563
564 #[inline]
569 fn get_all(
570 &self,
571 txs: Vec<B256>,
572 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
573 let mut res = Vec::with_capacity(txs.len());
574 let mut cache_miss = Vec::new();
575 {
576 let mut cache = self.blob_cache.lock();
577 for tx in txs {
578 if let Some(blob) = cache.get(&tx) {
579 res.push((tx, blob.clone()));
580 } else {
581 cache_miss.push(tx)
582 }
583 }
584 }
585 if cache_miss.is_empty() {
586 return Ok(res)
587 }
588 let from_disk = self.read_many_decoded(cache_miss);
589 if from_disk.is_empty() {
590 return Ok(res)
591 }
592 let from_disk = from_disk
593 .into_iter()
594 .map(|(tx, data)| {
595 let data = Arc::new(data);
596 res.push((tx, data.clone()));
597 (tx, data)
598 })
599 .collect::<Vec<_>>();
600
601 let mut cache = self.blob_cache.lock();
602 for (tx, data) in from_disk {
603 cache.insert(tx, data);
604 }
605
606 Ok(res)
607 }
608
609 #[inline]
613 fn get_exact(
614 &self,
615 txs: Vec<B256>,
616 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
617 txs.into_iter()
618 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
619 .collect()
620 }
621}
622
623impl fmt::Debug for DiskFileBlobStoreInner {
624 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625 f.debug_struct("DiskFileBlobStoreInner")
626 .field("blob_dir", &self.blob_dir)
627 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
628 .field("txs_to_delete", &self.txs_to_delete.try_read())
629 .finish()
630 }
631}
632
633#[derive(Debug, thiserror::Error)]
635pub enum DiskFileBlobStoreError {
636 #[error("failed to open blobstore at {0}: {1}")]
638 Open(PathBuf, io::Error),
640 #[error("[{0}] failed to read blob file at {1}: {2}")]
642 ReadFile(TxHash, PathBuf, io::Error),
644 #[error("[{0}] failed to write blob file at {1}: {2}")]
646 WriteFile(TxHash, PathBuf, io::Error),
648 #[error("[{0}] failed to delete blob file at {1}: {2}")]
650 DeleteFile(TxHash, PathBuf, io::Error),
652}
653
654impl From<DiskFileBlobStoreError> for BlobStoreError {
655 fn from(value: DiskFileBlobStoreError) -> Self {
656 Self::Other(Box::new(value))
657 }
658}
659
660#[derive(Debug, Clone)]
662pub struct DiskFileBlobStoreConfig {
663 pub max_cached_entries: u32,
665 pub open: OpenDiskFileBlobStore,
667}
668
669impl Default for DiskFileBlobStoreConfig {
670 fn default() -> Self {
671 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
672 }
673}
674
675impl DiskFileBlobStoreConfig {
676 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
678 self.max_cached_entries = max_cached_entries;
679 self
680 }
681}
682
683#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
685pub enum OpenDiskFileBlobStore {
686 #[default]
688 Clear,
689 ReIndex,
691}
692
693#[cfg(test)]
694mod tests {
695 use alloy_consensus::BlobTransactionSidecar;
696 use alloy_eips::{
697 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
698 eip7594::{
699 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
700 },
701 };
702
703 use super::*;
704 use std::sync::atomic::Ordering;
705
706 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
707 let dir = tempfile::tempdir().unwrap();
708 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
709 (store, dir)
710 }
711
712 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
713 let mut rng = rand::rng();
714 (0..num)
715 .map(|_| {
716 let tx = TxHash::random_with(&mut rng);
717 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
718 blobs: vec![],
719 commitments: vec![],
720 proofs: vec![],
721 });
722 (tx, blob)
723 })
724 .collect()
725 }
726
727 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
728 let blob = Blob::default();
729 let commitment = Bytes48::default();
730 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
731
732 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
733
734 let expected =
735 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
736 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
737
738 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
739 }
740
741 #[test]
742 fn disk_insert_all_get_all() {
743 let (store, _dir) = tmp_store();
744
745 let blobs = rng_blobs(10);
746 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
747 store.insert_all(blobs.clone()).unwrap();
748
749 for (tx, blob) in &blobs {
751 assert!(store.is_cached(tx));
752 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
753 assert_eq!(b, *blob);
754 }
755
756 let all = store.get_all(all_hashes.clone()).unwrap();
757 for (tx, blob) in all {
758 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
759 }
760
761 assert!(store.contains(all_hashes[0]).unwrap());
762 store.delete_all(all_hashes.clone()).unwrap();
763 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
764 store.clear_cache();
765 store.cleanup();
766
767 assert!(store.get(blobs[0].0).unwrap().is_none());
768
769 let all = store.get_all(all_hashes.clone()).unwrap();
770 assert!(all.is_empty());
771
772 assert!(!store.contains(all_hashes[0]).unwrap());
773 assert!(store.get_exact(all_hashes).is_err());
774
775 assert_eq!(store.data_size_hint(), Some(0));
776 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
777 }
778
779 #[test]
780 fn disk_insert_and_retrieve() {
781 let (store, _dir) = tmp_store();
782
783 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
784 store.insert(tx, blob.clone()).unwrap();
785
786 assert!(store.is_cached(&tx));
787 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
788 assert_eq!(retrieved_blob, blob);
789 }
790
791 #[test]
792 fn disk_delete_blob() {
793 let (store, _dir) = tmp_store();
794
795 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
796 store.insert(tx, blob).unwrap();
797 assert!(store.is_cached(&tx));
798
799 store.delete(tx).unwrap();
800 assert!(store.inner.txs_to_delete.read().contains(&tx));
801 store.cleanup();
802
803 let result = store.get(tx).unwrap();
804 assert_eq!(
805 result,
806 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
807 blobs: vec![],
808 commitments: vec![],
809 proofs: vec![]
810 })))
811 );
812 }
813
814 #[test]
815 fn disk_insert_all_and_delete_all() {
816 let (store, _dir) = tmp_store();
817
818 let blobs = rng_blobs(5);
819 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
820 store.insert_all(blobs.clone()).unwrap();
821
822 for (tx, _) in &blobs {
823 assert!(store.is_cached(tx));
824 }
825
826 store.delete_all(txs.clone()).unwrap();
827 store.cleanup();
828
829 for tx in txs {
830 let result = store.get(tx).unwrap();
831 assert_eq!(
832 result,
833 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
834 blobs: vec![],
835 commitments: vec![],
836 proofs: vec![]
837 })))
838 );
839 }
840 }
841
842 #[test]
843 fn disk_get_all_blobs() {
844 let (store, _dir) = tmp_store();
845
846 let blobs = rng_blobs(3);
847 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
848 store.insert_all(blobs.clone()).unwrap();
849
850 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
851 for (tx, blob) in retrieved_blobs {
852 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
853 }
854
855 store.delete_all(txs).unwrap();
856 store.cleanup();
857 }
858
859 #[test]
860 fn disk_get_exact_blobs_success() {
861 let (store, _dir) = tmp_store();
862
863 let blobs = rng_blobs(3);
864 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
865 store.insert_all(blobs.clone()).unwrap();
866
867 let retrieved_blobs = store.get_exact(txs).unwrap();
868 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
869 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
870 }
871 }
872
873 #[test]
874 fn disk_get_exact_blobs_failure() {
875 let (store, _dir) = tmp_store();
876
877 let blobs = rng_blobs(2);
878 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
879 store.insert_all(blobs).unwrap();
880
881 let missing_tx = TxHash::random();
883 let result = store.get_exact(vec![txs[0], missing_tx]);
884 assert!(result.is_err());
885 }
886
887 #[test]
888 fn disk_data_size_hint() {
889 let (store, _dir) = tmp_store();
890 assert_eq!(store.data_size_hint(), Some(0));
891
892 let blobs = rng_blobs(2);
893 store.insert_all(blobs).unwrap();
894 assert!(store.data_size_hint().unwrap() > 0);
895 }
896
897 #[test]
898 fn disk_cleanup_stat() {
899 let (store, _dir) = tmp_store();
900
901 let blobs = rng_blobs(3);
902 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
903 store.insert_all(blobs).unwrap();
904
905 store.delete_all(txs).unwrap();
906 let stat = store.cleanup();
907 assert_eq!(stat.delete_succeed, 3);
908 assert_eq!(stat.delete_failed, 0);
909 }
910
911 #[test]
912 fn disk_get_blobs_v3_returns_partial_results() {
913 let (store, _dir) = tmp_store();
914
915 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
916 store.insert(TxHash::random(), sidecar).unwrap();
917
918 assert_ne!(versioned_hash, B256::ZERO);
919
920 let request = vec![versioned_hash, B256::ZERO];
921 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
922 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
923
924 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
925 assert_eq!(v3, vec![Some(expected), None]);
926 }
927
928 #[test]
929 fn disk_get_blobs_v3_can_fallback_to_disk() {
930 let (store, _dir) = tmp_store();
931
932 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
933 store.insert(TxHash::random(), sidecar).unwrap();
934 store.clear_cache();
935
936 let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
937 assert_eq!(v3, vec![Some(expected)]);
938 }
939
940 #[test]
941 fn disk_double_cleanup_no_failure() {
942 let (store, _dir) = tmp_store();
943
944 let blobs = rng_blobs(5);
945 let all_hashes: Vec<_> = blobs.iter().map(|(tx, _)| *tx).collect();
946 store.insert_all(blobs).unwrap();
947 store.clear_cache();
948
949 store.delete_all(all_hashes.clone()).unwrap();
951
952 let stat1 = store.cleanup();
954 assert_eq!(stat1.delete_succeed, 5);
955 assert_eq!(stat1.delete_failed, 0);
956
957 store.inner.txs_to_delete.write().extend(all_hashes);
959
960 let stat2 = store.cleanup();
962 assert_eq!(stat2.delete_succeed, 5);
963 assert_eq!(stat2.delete_failed, 0);
964 }
965}