1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2},
6 eip7594::BlobTransactionSidecarVariant,
7 eip7840::BlobParams,
8 merge::EPOCH_SLOTS,
9};
10use alloy_primitives::{TxHash, B256};
11use parking_lot::{Mutex, RwLock};
12use schnellru::{ByLength, LruMap};
13use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
14use tracing::{debug, trace};
15
16pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
18
19const VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE: u64 =
24 BlobParams::bpo2().max_blobs_per_tx * BlobParams::bpo2().max_blob_count * EPOCH_SLOTS * 16;
25
26#[derive(Clone, Debug)]
32pub struct DiskFileBlobStore {
33 inner: Arc<DiskFileBlobStoreInner>,
34}
35
36impl DiskFileBlobStore {
37 pub fn open(
39 blob_dir: impl Into<PathBuf>,
40 opts: DiskFileBlobStoreConfig,
41 ) -> Result<Self, DiskFileBlobStoreError> {
42 let blob_dir = blob_dir.into();
43 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
44 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
45
46 inner.delete_all()?;
48 inner.create_blob_dir()?;
49
50 Ok(Self { inner: Arc::new(inner) })
51 }
52
53 #[cfg(test)]
54 fn is_cached(&self, tx: &B256) -> bool {
55 self.inner.blob_cache.lock().get(tx).is_some()
56 }
57
58 #[cfg(test)]
59 fn clear_cache(&self) {
60 self.inner.blob_cache.lock().clear()
61 }
62
63 fn get_by_versioned_hashes_eip7594(
72 &self,
73 versioned_hashes: &[B256],
74 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
75 let mut result = vec![None; versioned_hashes.len()];
78 let mut missing_count = result.len();
79 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
81 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
82 for (hash_idx, match_result) in
83 blob_sidecar.match_versioned_hashes(versioned_hashes)
84 {
85 result[hash_idx] = Some(match_result);
86 missing_count -= 1;
87 }
88 }
89
90 if missing_count == 0 {
92 if result.iter().all(|blob| blob.is_some()) {
94 return Ok(result);
95 }
96 }
97 }
98
99 let mut missing_tx_hashes = Vec::new();
101
102 {
103 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
104 for (idx, _) in
105 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
106 {
107 let versioned_hash = versioned_hashes[idx];
109 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
110 missing_tx_hashes.push(tx_hash);
111 }
112 }
113 }
114
115 if !missing_tx_hashes.is_empty() {
117 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
118 for (_, blob_sidecar) in blobs_from_disk {
119 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
120 for (hash_idx, match_result) in
121 blob_sidecar.match_versioned_hashes(versioned_hashes)
122 {
123 if result[hash_idx].is_none() {
124 result[hash_idx] = Some(match_result);
125 }
126 }
127 }
128 }
129 }
130
131 Ok(result)
132 }
133}
134
135impl BlobStore for DiskFileBlobStore {
136 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
137 self.inner.insert_one(tx, data)
138 }
139
140 fn insert_all(
141 &self,
142 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
143 ) -> Result<(), BlobStoreError> {
144 if txs.is_empty() {
145 return Ok(())
146 }
147 self.inner.insert_many(txs)
148 }
149
150 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
151 if self.inner.contains(tx)? {
152 self.inner.txs_to_delete.write().insert(tx);
153 }
154 Ok(())
155 }
156
157 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
158 if txs.is_empty() {
159 return Ok(())
160 }
161 let txs = self.inner.retain_existing(txs)?;
162 self.inner.txs_to_delete.write().extend(txs);
163 Ok(())
164 }
165
166 fn cleanup(&self) -> BlobStoreCleanupStat {
167 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
168 let mut stat = BlobStoreCleanupStat::default();
169 let mut subsize = 0;
170 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
171 for tx in txs_to_delete {
172 let path = self.inner.blob_disk_file(tx);
173 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
174 match fs::remove_file(&path) {
175 Ok(_) => {
176 stat.delete_succeed += 1;
177 subsize += filesize;
178 }
179 Err(e) => {
180 stat.delete_failed += 1;
181 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
182 debug!(target:"txpool::blob", %err);
183 }
184 };
185 }
186 self.inner.size_tracker.sub_size(subsize as usize);
187 self.inner.size_tracker.sub_len(stat.delete_succeed);
188 stat
189 }
190
191 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
192 self.inner.get_one(tx)
193 }
194
195 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
196 self.inner.contains(tx)
197 }
198
199 fn get_all(
200 &self,
201 txs: Vec<B256>,
202 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
203 if txs.is_empty() {
204 return Ok(Vec::new())
205 }
206 self.inner.get_all(txs)
207 }
208
209 fn get_exact(
210 &self,
211 txs: Vec<B256>,
212 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
213 if txs.is_empty() {
214 return Ok(Vec::new())
215 }
216 self.inner.get_exact(txs)
217 }
218
219 fn get_by_versioned_hashes_v1(
220 &self,
221 versioned_hashes: &[B256],
222 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
223 let mut result = vec![None; versioned_hashes.len()];
225
226 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
228 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
229 for (hash_idx, match_result) in
230 blob_sidecar.match_versioned_hashes(versioned_hashes)
231 {
232 result[hash_idx] = Some(match_result);
233 }
234 }
235
236 if result.iter().all(|blob| blob.is_some()) {
238 return Ok(result);
239 }
240 }
241
242 let mut missing_tx_hashes = Vec::new();
245
246 {
247 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
248 for (idx, _) in
249 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
250 {
251 let versioned_hash = versioned_hashes[idx];
253 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
254 missing_tx_hashes.push(tx_hash);
255 }
256 }
257 }
258
259 if !missing_tx_hashes.is_empty() {
261 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
262 for (_, blob_sidecar) in blobs_from_disk {
263 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
264 for (hash_idx, match_result) in
265 blob_sidecar.match_versioned_hashes(versioned_hashes)
266 {
267 if result[hash_idx].is_none() {
268 result[hash_idx] = Some(match_result);
269 }
270 }
271 }
272 }
273 }
274
275 Ok(result)
276 }
277
278 fn get_by_versioned_hashes_v2(
279 &self,
280 versioned_hashes: &[B256],
281 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
282 let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
283
284 if result.iter().all(|blob| blob.is_some()) {
286 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
287 } else {
288 Ok(None)
289 }
290 }
291
292 fn get_by_versioned_hashes_v3(
293 &self,
294 versioned_hashes: &[B256],
295 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
296 self.get_by_versioned_hashes_eip7594(versioned_hashes)
297 }
298
299 fn data_size_hint(&self) -> Option<usize> {
300 Some(self.inner.size_tracker.data_size())
301 }
302
303 fn blobs_len(&self) -> usize {
304 self.inner.size_tracker.blobs_len()
305 }
306}
307
308struct DiskFileBlobStoreInner {
309 blob_dir: PathBuf,
310 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
311 size_tracker: BlobStoreSize,
312 file_lock: RwLock<()>,
313 txs_to_delete: RwLock<HashSet<B256>>,
314 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
319}
320
321impl DiskFileBlobStoreInner {
322 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
324 Self {
325 blob_dir,
326 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
327 size_tracker: Default::default(),
328 file_lock: Default::default(),
329 txs_to_delete: Default::default(),
330 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(
331 VERSIONED_HASH_TO_TX_HASH_CACHE_SIZE as u32,
332 ))),
333 }
334 }
335
336 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
338 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
339 fs::create_dir_all(&self.blob_dir)
340 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
341 }
342
343 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
345 match fs::remove_dir_all(&self.blob_dir) {
346 Ok(_) => {
347 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
348 }
349 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
350 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
351 }
352 Ok(())
353 }
354
355 fn insert_one(
357 &self,
358 tx: B256,
359 data: BlobTransactionSidecarVariant,
360 ) -> Result<(), BlobStoreError> {
361 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
362 data.rlp_encode_fields(&mut buf);
363
364 {
365 let mut map = self.versioned_hashes_to_txhash.lock();
367 data.versioned_hashes().for_each(|hash| {
368 map.insert(hash, tx);
369 });
370 }
371
372 self.blob_cache.lock().insert(tx, Arc::new(data));
373
374 let size = self.write_one_encoded(tx, &buf)?;
375
376 self.size_tracker.add_size(size);
377 self.size_tracker.inc_len(1);
378 Ok(())
379 }
380
381 fn insert_many(
383 &self,
384 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
385 ) -> Result<(), BlobStoreError> {
386 let raw = txs
387 .iter()
388 .map(|(tx, data)| {
389 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
390 data.rlp_encode_fields(&mut buf);
391 (self.blob_disk_file(*tx), buf)
392 })
393 .collect::<Vec<_>>();
394
395 {
396 let mut map = self.versioned_hashes_to_txhash.lock();
398 for (tx, data) in &txs {
399 data.versioned_hashes().for_each(|hash| {
400 map.insert(hash, *tx);
401 });
402 }
403 }
404
405 {
406 let mut cache = self.blob_cache.lock();
408 for (tx, data) in txs {
409 cache.insert(tx, Arc::new(data));
410 }
411 }
412
413 let mut add = 0;
414 let mut num = 0;
415 {
416 let _lock = self.file_lock.write();
417 for (path, data) in raw {
418 if path.exists() {
419 debug!(target:"txpool::blob", ?path, "Blob already exists");
420 } else if let Err(err) = fs::write(&path, &data) {
421 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
422 } else {
423 add += data.len();
424 num += 1;
425 }
426 }
427 }
428 self.size_tracker.add_size(add);
429 self.size_tracker.inc_len(num);
430
431 Ok(())
432 }
433
434 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
436 if self.blob_cache.lock().get(&tx).is_some() {
437 return Ok(true)
438 }
439 Ok(self.blob_disk_file(tx).is_file())
441 }
442
443 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
445 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
446 let mut cache = self.blob_cache.lock();
447 txs.into_iter().partition(|tx| cache.get(tx).is_some())
448 };
449
450 let mut existing = in_cache;
451 for tx in not_in_cache {
452 if self.blob_disk_file(tx).is_file() {
453 existing.push(tx);
454 }
455 }
456
457 Ok(existing)
458 }
459
460 fn get_one(
462 &self,
463 tx: B256,
464 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
465 if let Some(blob) = self.blob_cache.lock().get(&tx) {
466 return Ok(Some(blob.clone()))
467 }
468
469 if let Some(blob) = self.read_one(tx)? {
470 let blob_arc = Arc::new(blob);
471 self.blob_cache.lock().insert(tx, blob_arc.clone());
472 return Ok(Some(blob_arc))
473 }
474
475 Ok(None)
476 }
477
478 #[inline]
480 fn blob_disk_file(&self, tx: B256) -> PathBuf {
481 self.blob_dir.join(format!("{tx:x}"))
482 }
483
484 #[inline]
486 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
487 let path = self.blob_disk_file(tx);
488 let data = {
489 let _lock = self.file_lock.read();
490 match fs::read(&path) {
491 Ok(data) => data,
492 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
493 Err(e) => {
494 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
495 tx, path, e,
496 ))))
497 }
498 }
499 };
500 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
501 .map(Some)
502 .map_err(BlobStoreError::DecodeError)
503 }
504
505 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
509 self.read_many_raw(txs)
510 .into_iter()
511 .filter_map(|(tx, data)| {
512 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
513 .map(|sidecar| (tx, sidecar))
514 .ok()
515 })
516 .collect()
517 }
518
519 #[inline]
523 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
524 let mut res = Vec::with_capacity(txs.len());
525 let _lock = self.file_lock.read();
526 for tx in txs {
527 let path = self.blob_disk_file(tx);
528 match fs::read(&path) {
529 Ok(data) => {
530 res.push((tx, data));
531 }
532 Err(err) => {
533 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
534 }
535 };
536 }
537 res
538 }
539
540 #[inline]
542 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
543 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
544 let mut add = 0;
545 let path = self.blob_disk_file(tx);
546 {
547 let _lock = self.file_lock.write();
548 if !path.exists() {
549 fs::write(&path, data)
550 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
551 add = data.len();
552 }
553 }
554 Ok(add)
555 }
556
557 #[inline]
562 fn get_all(
563 &self,
564 txs: Vec<B256>,
565 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
566 let mut res = Vec::with_capacity(txs.len());
567 let mut cache_miss = Vec::new();
568 {
569 let mut cache = self.blob_cache.lock();
570 for tx in txs {
571 if let Some(blob) = cache.get(&tx) {
572 res.push((tx, blob.clone()));
573 } else {
574 cache_miss.push(tx)
575 }
576 }
577 }
578 if cache_miss.is_empty() {
579 return Ok(res)
580 }
581 let from_disk = self.read_many_decoded(cache_miss);
582 if from_disk.is_empty() {
583 return Ok(res)
584 }
585 let from_disk = from_disk
586 .into_iter()
587 .map(|(tx, data)| {
588 let data = Arc::new(data);
589 res.push((tx, data.clone()));
590 (tx, data)
591 })
592 .collect::<Vec<_>>();
593
594 let mut cache = self.blob_cache.lock();
595 for (tx, data) in from_disk {
596 cache.insert(tx, data);
597 }
598
599 Ok(res)
600 }
601
602 #[inline]
606 fn get_exact(
607 &self,
608 txs: Vec<B256>,
609 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
610 txs.into_iter()
611 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
612 .collect()
613 }
614}
615
616impl fmt::Debug for DiskFileBlobStoreInner {
617 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618 f.debug_struct("DiskFileBlobStoreInner")
619 .field("blob_dir", &self.blob_dir)
620 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
621 .field("txs_to_delete", &self.txs_to_delete.try_read())
622 .finish()
623 }
624}
625
626#[derive(Debug, thiserror::Error)]
628pub enum DiskFileBlobStoreError {
629 #[error("failed to open blobstore at {0}: {1}")]
631 Open(PathBuf, io::Error),
633 #[error("[{0}] failed to read blob file at {1}: {2}")]
635 ReadFile(TxHash, PathBuf, io::Error),
637 #[error("[{0}] failed to write blob file at {1}: {2}")]
639 WriteFile(TxHash, PathBuf, io::Error),
641 #[error("[{0}] failed to delete blob file at {1}: {2}")]
643 DeleteFile(TxHash, PathBuf, io::Error),
645}
646
647impl From<DiskFileBlobStoreError> for BlobStoreError {
648 fn from(value: DiskFileBlobStoreError) -> Self {
649 Self::Other(Box::new(value))
650 }
651}
652
653#[derive(Debug, Clone)]
655pub struct DiskFileBlobStoreConfig {
656 pub max_cached_entries: u32,
658 pub open: OpenDiskFileBlobStore,
660}
661
662impl Default for DiskFileBlobStoreConfig {
663 fn default() -> Self {
664 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
665 }
666}
667
668impl DiskFileBlobStoreConfig {
669 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
671 self.max_cached_entries = max_cached_entries;
672 self
673 }
674}
675
676#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
678pub enum OpenDiskFileBlobStore {
679 #[default]
681 Clear,
682 ReIndex,
684}
685
686#[cfg(test)]
687mod tests {
688 use alloy_consensus::BlobTransactionSidecar;
689 use alloy_eips::{
690 eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
691 eip7594::{
692 BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
693 },
694 };
695
696 use super::*;
697 use std::sync::atomic::Ordering;
698
699 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
700 let dir = tempfile::tempdir().unwrap();
701 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
702 (store, dir)
703 }
704
705 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
706 let mut rng = rand::rng();
707 (0..num)
708 .map(|_| {
709 let tx = TxHash::random_with(&mut rng);
710 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
711 blobs: vec![],
712 commitments: vec![],
713 proofs: vec![],
714 });
715 (tx, blob)
716 })
717 .collect()
718 }
719
720 fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
721 let blob = Blob::default();
722 let commitment = Bytes48::default();
723 let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
724
725 let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
726
727 let expected =
728 BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
729 let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
730
731 (BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
732 }
733
734 #[test]
735 fn disk_insert_all_get_all() {
736 let (store, _dir) = tmp_store();
737
738 let blobs = rng_blobs(10);
739 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
740 store.insert_all(blobs.clone()).unwrap();
741
742 for (tx, blob) in &blobs {
744 assert!(store.is_cached(tx));
745 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
746 assert_eq!(b, *blob);
747 }
748
749 let all = store.get_all(all_hashes.clone()).unwrap();
750 for (tx, blob) in all {
751 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
752 }
753
754 assert!(store.contains(all_hashes[0]).unwrap());
755 store.delete_all(all_hashes.clone()).unwrap();
756 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
757 store.clear_cache();
758 store.cleanup();
759
760 assert!(store.get(blobs[0].0).unwrap().is_none());
761
762 let all = store.get_all(all_hashes.clone()).unwrap();
763 assert!(all.is_empty());
764
765 assert!(!store.contains(all_hashes[0]).unwrap());
766 assert!(store.get_exact(all_hashes).is_err());
767
768 assert_eq!(store.data_size_hint(), Some(0));
769 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
770 }
771
772 #[test]
773 fn disk_insert_and_retrieve() {
774 let (store, _dir) = tmp_store();
775
776 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
777 store.insert(tx, blob.clone()).unwrap();
778
779 assert!(store.is_cached(&tx));
780 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
781 assert_eq!(retrieved_blob, blob);
782 }
783
784 #[test]
785 fn disk_delete_blob() {
786 let (store, _dir) = tmp_store();
787
788 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
789 store.insert(tx, blob).unwrap();
790 assert!(store.is_cached(&tx));
791
792 store.delete(tx).unwrap();
793 assert!(store.inner.txs_to_delete.read().contains(&tx));
794 store.cleanup();
795
796 let result = store.get(tx).unwrap();
797 assert_eq!(
798 result,
799 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
800 blobs: vec![],
801 commitments: vec![],
802 proofs: vec![]
803 })))
804 );
805 }
806
807 #[test]
808 fn disk_insert_all_and_delete_all() {
809 let (store, _dir) = tmp_store();
810
811 let blobs = rng_blobs(5);
812 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
813 store.insert_all(blobs.clone()).unwrap();
814
815 for (tx, _) in &blobs {
816 assert!(store.is_cached(tx));
817 }
818
819 store.delete_all(txs.clone()).unwrap();
820 store.cleanup();
821
822 for tx in txs {
823 let result = store.get(tx).unwrap();
824 assert_eq!(
825 result,
826 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
827 blobs: vec![],
828 commitments: vec![],
829 proofs: vec![]
830 })))
831 );
832 }
833 }
834
835 #[test]
836 fn disk_get_all_blobs() {
837 let (store, _dir) = tmp_store();
838
839 let blobs = rng_blobs(3);
840 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
841 store.insert_all(blobs.clone()).unwrap();
842
843 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
844 for (tx, blob) in retrieved_blobs {
845 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
846 }
847
848 store.delete_all(txs).unwrap();
849 store.cleanup();
850 }
851
852 #[test]
853 fn disk_get_exact_blobs_success() {
854 let (store, _dir) = tmp_store();
855
856 let blobs = rng_blobs(3);
857 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
858 store.insert_all(blobs.clone()).unwrap();
859
860 let retrieved_blobs = store.get_exact(txs).unwrap();
861 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
862 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
863 }
864 }
865
866 #[test]
867 fn disk_get_exact_blobs_failure() {
868 let (store, _dir) = tmp_store();
869
870 let blobs = rng_blobs(2);
871 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
872 store.insert_all(blobs).unwrap();
873
874 let missing_tx = TxHash::random();
876 let result = store.get_exact(vec![txs[0], missing_tx]);
877 assert!(result.is_err());
878 }
879
880 #[test]
881 fn disk_data_size_hint() {
882 let (store, _dir) = tmp_store();
883 assert_eq!(store.data_size_hint(), Some(0));
884
885 let blobs = rng_blobs(2);
886 store.insert_all(blobs).unwrap();
887 assert!(store.data_size_hint().unwrap() > 0);
888 }
889
890 #[test]
891 fn disk_cleanup_stat() {
892 let (store, _dir) = tmp_store();
893
894 let blobs = rng_blobs(3);
895 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
896 store.insert_all(blobs).unwrap();
897
898 store.delete_all(txs).unwrap();
899 let stat = store.cleanup();
900 assert_eq!(stat.delete_succeed, 3);
901 assert_eq!(stat.delete_failed, 0);
902 }
903
904 #[test]
905 fn disk_get_blobs_v3_returns_partial_results() {
906 let (store, _dir) = tmp_store();
907
908 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
909 store.insert(TxHash::random(), sidecar).unwrap();
910
911 assert_ne!(versioned_hash, B256::ZERO);
912
913 let request = vec![versioned_hash, B256::ZERO];
914 let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
915 assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
916
917 let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
918 assert_eq!(v3, vec![Some(expected), None]);
919 }
920
921 #[test]
922 fn disk_get_blobs_v3_can_fallback_to_disk() {
923 let (store, _dir) = tmp_store();
924
925 let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
926 store.insert(TxHash::random(), sidecar).unwrap();
927 store.clear_cache();
928
929 let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
930 assert_eq!(v3, vec![Some(expected)]);
931 }
932}