1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
5use alloy_primitives::{TxHash, B256};
6use parking_lot::{Mutex, RwLock};
7use schnellru::{ByLength, LruMap};
8use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
9use tracing::{debug, trace};
10
11pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
13
14#[derive(Clone, Debug)]
20pub struct DiskFileBlobStore {
21 inner: Arc<DiskFileBlobStoreInner>,
22}
23
24impl DiskFileBlobStore {
25 pub fn open(
27 blob_dir: impl Into<PathBuf>,
28 opts: DiskFileBlobStoreConfig,
29 ) -> Result<Self, DiskFileBlobStoreError> {
30 let blob_dir = blob_dir.into();
31 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
32 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
33
34 inner.delete_all()?;
36 inner.create_blob_dir()?;
37
38 Ok(Self { inner: Arc::new(inner) })
39 }
40
41 #[cfg(test)]
42 fn is_cached(&self, tx: &B256) -> bool {
43 self.inner.blob_cache.lock().get(tx).is_some()
44 }
45
46 #[cfg(test)]
47 fn clear_cache(&self) {
48 self.inner.blob_cache.lock().clear()
49 }
50}
51
52impl BlobStore for DiskFileBlobStore {
53 fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
54 self.inner.insert_one(tx, data)
55 }
56
57 fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
58 if txs.is_empty() {
59 return Ok(())
60 }
61 self.inner.insert_many(txs)
62 }
63
64 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
65 if self.inner.contains(tx)? {
66 self.inner.txs_to_delete.write().insert(tx);
67 }
68 Ok(())
69 }
70
71 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
72 let txs = self.inner.retain_existing(txs)?;
73 self.inner.txs_to_delete.write().extend(txs);
74 Ok(())
75 }
76
77 fn cleanup(&self) -> BlobStoreCleanupStat {
78 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
79 let mut stat = BlobStoreCleanupStat::default();
80 let mut subsize = 0;
81 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
82 for tx in txs_to_delete {
83 let path = self.inner.blob_disk_file(tx);
84 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
85 match fs::remove_file(&path) {
86 Ok(_) => {
87 stat.delete_succeed += 1;
88 subsize += filesize;
89 }
90 Err(e) => {
91 stat.delete_failed += 1;
92 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
93 debug!(target:"txpool::blob", %err);
94 }
95 };
96 }
97 self.inner.size_tracker.sub_size(subsize as usize);
98 self.inner.size_tracker.sub_len(stat.delete_succeed);
99 stat
100 }
101
102 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
103 self.inner.get_one(tx)
104 }
105
106 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
107 self.inner.contains(tx)
108 }
109
110 fn get_all(
111 &self,
112 txs: Vec<B256>,
113 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
114 if txs.is_empty() {
115 return Ok(Vec::new())
116 }
117 self.inner.get_all(txs)
118 }
119
120 fn get_exact(
121 &self,
122 txs: Vec<B256>,
123 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
124 if txs.is_empty() {
125 return Ok(Vec::new())
126 }
127 self.inner.get_exact(txs)
128 }
129
130 fn get_by_versioned_hashes(
131 &self,
132 versioned_hashes: &[B256],
133 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
134 let mut result = vec![None; versioned_hashes.len()];
136
137 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
139 for (hash_idx, match_result) in blob_sidecar.match_versioned_hashes(versioned_hashes) {
140 result[hash_idx] = Some(match_result);
141 }
142
143 if result.iter().all(|blob| blob.is_some()) {
145 return Ok(result);
146 }
147 }
148
149 let mut missing_tx_hashes = Vec::new();
152
153 {
154 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
155 for (idx, _) in
156 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
157 {
158 let versioned_hash = versioned_hashes[idx];
160 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
161 missing_tx_hashes.push(tx_hash);
162 }
163 }
164 }
165
166 if !missing_tx_hashes.is_empty() {
168 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
169 for (_, blob_sidecar) in blobs_from_disk {
170 for (hash_idx, match_result) in
171 blob_sidecar.match_versioned_hashes(versioned_hashes)
172 {
173 if result[hash_idx].is_none() {
174 result[hash_idx] = Some(match_result);
175 }
176 }
177 }
178 }
179
180 Ok(result)
181 }
182
183 fn data_size_hint(&self) -> Option<usize> {
184 Some(self.inner.size_tracker.data_size())
185 }
186
187 fn blobs_len(&self) -> usize {
188 self.inner.size_tracker.blobs_len()
189 }
190}
191
192struct DiskFileBlobStoreInner {
193 blob_dir: PathBuf,
194 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
195 size_tracker: BlobStoreSize,
196 file_lock: RwLock<()>,
197 txs_to_delete: RwLock<HashSet<B256>>,
198 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
203}
204
205impl DiskFileBlobStoreInner {
206 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
208 Self {
209 blob_dir,
210 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
211 size_tracker: Default::default(),
212 file_lock: Default::default(),
213 txs_to_delete: Default::default(),
214 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(max_length * 6))),
215 }
216 }
217
218 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
220 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
221 fs::create_dir_all(&self.blob_dir)
222 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
223 }
224
225 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
227 match fs::remove_dir_all(&self.blob_dir) {
228 Ok(_) => {
229 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
230 }
231 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
232 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
233 }
234 Ok(())
235 }
236
237 fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
239 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
240 data.rlp_encode_fields(&mut buf);
241
242 {
243 let mut map = self.versioned_hashes_to_txhash.lock();
245 data.versioned_hashes().for_each(|hash| {
246 map.insert(hash, tx);
247 })
248 }
249
250 self.blob_cache.lock().insert(tx, Arc::new(data));
251
252 let size = self.write_one_encoded(tx, &buf)?;
253
254 self.size_tracker.add_size(size);
255 self.size_tracker.inc_len(1);
256 Ok(())
257 }
258
259 fn insert_many(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
261 let raw = txs
262 .iter()
263 .map(|(tx, data)| {
264 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
265 data.rlp_encode_fields(&mut buf);
266 (self.blob_disk_file(*tx), buf)
267 })
268 .collect::<Vec<_>>();
269
270 {
271 let mut map = self.versioned_hashes_to_txhash.lock();
273 for (tx, data) in &txs {
274 data.versioned_hashes().for_each(|hash| {
275 map.insert(hash, *tx);
276 })
277 }
278 }
279
280 {
281 let mut cache = self.blob_cache.lock();
283 for (tx, data) in txs {
284 cache.insert(tx, Arc::new(data));
285 }
286 }
287
288 let mut add = 0;
289 let mut num = 0;
290 {
291 let _lock = self.file_lock.write();
292 for (path, data) in raw {
293 if path.exists() {
294 debug!(target:"txpool::blob", ?path, "Blob already exists");
295 } else if let Err(err) = fs::write(&path, &data) {
296 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
297 } else {
298 add += data.len();
299 num += 1;
300 }
301 }
302 }
303 self.size_tracker.add_size(add);
304 self.size_tracker.inc_len(num);
305
306 Ok(())
307 }
308
309 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
311 if self.blob_cache.lock().get(&tx).is_some() {
312 return Ok(true)
313 }
314 Ok(self.blob_disk_file(tx).is_file())
316 }
317
318 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
320 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
321 let mut cache = self.blob_cache.lock();
322 txs.into_iter().partition(|tx| cache.get(tx).is_some())
323 };
324
325 let mut existing = in_cache;
326 for tx in not_in_cache {
327 if self.blob_disk_file(tx).is_file() {
328 existing.push(tx);
329 }
330 }
331
332 Ok(existing)
333 }
334
335 fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
337 if let Some(blob) = self.blob_cache.lock().get(&tx) {
338 return Ok(Some(blob.clone()))
339 }
340 let blob = self.read_one(tx)?;
341
342 if let Some(blob) = &blob {
343 let blob_arc = Arc::new(blob.clone());
344 self.blob_cache.lock().insert(tx, blob_arc.clone());
345 return Ok(Some(blob_arc))
346 }
347
348 Ok(None)
349 }
350
351 #[inline]
353 fn blob_disk_file(&self, tx: B256) -> PathBuf {
354 self.blob_dir.join(format!("{tx:x}"))
355 }
356
357 #[inline]
359 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
360 let path = self.blob_disk_file(tx);
361 let data = {
362 let _lock = self.file_lock.read();
363 match fs::read(&path) {
364 Ok(data) => data,
365 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
366 Err(e) => {
367 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
368 tx, path, e,
369 ))))
370 }
371 }
372 };
373 BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
374 .map(Some)
375 .map_err(BlobStoreError::DecodeError)
376 }
377
378 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecar)> {
382 self.read_many_raw(txs)
383 .into_iter()
384 .filter_map(|(tx, data)| {
385 BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
386 .map(|sidecar| (tx, sidecar))
387 .ok()
388 })
389 .collect()
390 }
391
392 #[inline]
396 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
397 let mut res = Vec::with_capacity(txs.len());
398 let _lock = self.file_lock.read();
399 for tx in txs {
400 let path = self.blob_disk_file(tx);
401 match fs::read(&path) {
402 Ok(data) => {
403 res.push((tx, data));
404 }
405 Err(err) => {
406 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
407 }
408 };
409 }
410 res
411 }
412
413 #[inline]
415 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
416 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
417 let mut add = 0;
418 let path = self.blob_disk_file(tx);
419 {
420 let _lock = self.file_lock.write();
421 if !path.exists() {
422 fs::write(&path, data)
423 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
424 add = data.len();
425 }
426 }
427 Ok(add)
428 }
429
430 #[inline]
435 fn get_all(
436 &self,
437 txs: Vec<B256>,
438 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
439 let mut res = Vec::with_capacity(txs.len());
440 let mut cache_miss = Vec::new();
441 {
442 let mut cache = self.blob_cache.lock();
443 for tx in txs {
444 if let Some(blob) = cache.get(&tx) {
445 res.push((tx, blob.clone()));
446 } else {
447 cache_miss.push(tx)
448 }
449 }
450 }
451 if cache_miss.is_empty() {
452 return Ok(res)
453 }
454 let from_disk = self.read_many_decoded(cache_miss);
455 if from_disk.is_empty() {
456 return Ok(res)
457 }
458 let mut cache = self.blob_cache.lock();
459 for (tx, data) in from_disk {
460 let arc = Arc::new(data.clone());
461 cache.insert(tx, arc.clone());
462 res.push((tx, arc.clone()));
463 }
464
465 Ok(res)
466 }
467
468 #[inline]
472 fn get_exact(
473 &self,
474 txs: Vec<B256>,
475 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
476 txs.into_iter()
477 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
478 .collect()
479 }
480}
481
482impl fmt::Debug for DiskFileBlobStoreInner {
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484 f.debug_struct("DiskFileBlobStoreInner")
485 .field("blob_dir", &self.blob_dir)
486 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
487 .field("txs_to_delete", &self.txs_to_delete.try_read())
488 .finish()
489 }
490}
491
492#[derive(Debug, thiserror::Error)]
494pub enum DiskFileBlobStoreError {
495 #[error("failed to open blobstore at {0}: {1}")]
497 Open(PathBuf, io::Error),
499 #[error("[{0}] failed to read blob file at {1}: {2}")]
501 ReadFile(TxHash, PathBuf, io::Error),
503 #[error("[{0}] failed to write blob file at {1}: {2}")]
505 WriteFile(TxHash, PathBuf, io::Error),
507 #[error("[{0}] failed to delete blob file at {1}: {2}")]
509 DeleteFile(TxHash, PathBuf, io::Error),
511}
512
513impl From<DiskFileBlobStoreError> for BlobStoreError {
514 fn from(value: DiskFileBlobStoreError) -> Self {
515 Self::Other(Box::new(value))
516 }
517}
518
519#[derive(Debug, Clone)]
521pub struct DiskFileBlobStoreConfig {
522 pub max_cached_entries: u32,
524 pub open: OpenDiskFileBlobStore,
526}
527
528impl Default for DiskFileBlobStoreConfig {
529 fn default() -> Self {
530 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
531 }
532}
533
534impl DiskFileBlobStoreConfig {
535 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
537 self.max_cached_entries = max_cached_entries;
538 self
539 }
540}
541
542#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
544pub enum OpenDiskFileBlobStore {
545 #[default]
547 Clear,
548 ReIndex,
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use std::sync::atomic::Ordering;
556
557 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
558 let dir = tempfile::tempdir().unwrap();
559 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
560 (store, dir)
561 }
562
563 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecar)> {
564 let mut rng = rand::rng();
565 (0..num)
566 .map(|_| {
567 let tx = TxHash::random_with(&mut rng);
568 let blob =
569 BlobTransactionSidecar { blobs: vec![], commitments: vec![], proofs: vec![] };
570 (tx, blob)
571 })
572 .collect()
573 }
574
575 #[test]
576 fn disk_insert_all_get_all() {
577 let (store, _dir) = tmp_store();
578
579 let blobs = rng_blobs(10);
580 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
581 store.insert_all(blobs.clone()).unwrap();
582
583 for (tx, blob) in &blobs {
585 assert!(store.is_cached(tx));
586 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
587 assert_eq!(b, *blob);
588 }
589
590 let all = store.get_all(all_hashes.clone()).unwrap();
591 for (tx, blob) in all {
592 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
593 }
594
595 assert!(store.contains(all_hashes[0]).unwrap());
596 store.delete_all(all_hashes.clone()).unwrap();
597 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
598 store.clear_cache();
599 store.cleanup();
600
601 assert!(store.get(blobs[0].0).unwrap().is_none());
602
603 let all = store.get_all(all_hashes.clone()).unwrap();
604 assert!(all.is_empty());
605
606 assert!(!store.contains(all_hashes[0]).unwrap());
607 assert!(store.get_exact(all_hashes).is_err());
608
609 assert_eq!(store.data_size_hint(), Some(0));
610 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
611 }
612
613 #[test]
614 fn disk_insert_and_retrieve() {
615 let (store, _dir) = tmp_store();
616
617 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
618 store.insert(tx, blob.clone()).unwrap();
619
620 assert!(store.is_cached(&tx));
621 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
622 assert_eq!(retrieved_blob, blob);
623 }
624
625 #[test]
626 fn disk_delete_blob() {
627 let (store, _dir) = tmp_store();
628
629 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
630 store.insert(tx, blob).unwrap();
631 assert!(store.is_cached(&tx));
632
633 store.delete(tx).unwrap();
634 assert!(store.inner.txs_to_delete.read().contains(&tx));
635 store.cleanup();
636
637 let result = store.get(tx).unwrap();
638 assert_eq!(
639 result,
640 Some(Arc::new(BlobTransactionSidecar {
641 blobs: vec![],
642 commitments: vec![],
643 proofs: vec![]
644 }))
645 );
646 }
647
648 #[test]
649 fn disk_insert_all_and_delete_all() {
650 let (store, _dir) = tmp_store();
651
652 let blobs = rng_blobs(5);
653 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
654 store.insert_all(blobs.clone()).unwrap();
655
656 for (tx, _) in &blobs {
657 assert!(store.is_cached(tx));
658 }
659
660 store.delete_all(txs.clone()).unwrap();
661 store.cleanup();
662
663 for tx in txs {
664 let result = store.get(tx).unwrap();
665 assert_eq!(
666 result,
667 Some(Arc::new(BlobTransactionSidecar {
668 blobs: vec![],
669 commitments: vec![],
670 proofs: vec![]
671 }))
672 );
673 }
674 }
675
676 #[test]
677 fn disk_get_all_blobs() {
678 let (store, _dir) = tmp_store();
679
680 let blobs = rng_blobs(3);
681 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
682 store.insert_all(blobs.clone()).unwrap();
683
684 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
685 for (tx, blob) in retrieved_blobs {
686 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
687 }
688
689 store.delete_all(txs).unwrap();
690 store.cleanup();
691 }
692
693 #[test]
694 fn disk_get_exact_blobs_success() {
695 let (store, _dir) = tmp_store();
696
697 let blobs = rng_blobs(3);
698 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
699 store.insert_all(blobs.clone()).unwrap();
700
701 let retrieved_blobs = store.get_exact(txs).unwrap();
702 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
703 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
704 }
705 }
706
707 #[test]
708 fn disk_get_exact_blobs_failure() {
709 let (store, _dir) = tmp_store();
710
711 let blobs = rng_blobs(2);
712 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
713 store.insert_all(blobs).unwrap();
714
715 let missing_tx = TxHash::random();
717 let result = store.get_exact(vec![txs[0], missing_tx]);
718 assert!(result.is_err());
719 }
720
721 #[test]
722 fn disk_data_size_hint() {
723 let (store, _dir) = tmp_store();
724 assert_eq!(store.data_size_hint(), Some(0));
725
726 let blobs = rng_blobs(2);
727 store.insert_all(blobs).unwrap();
728 assert!(store.data_size_hint().unwrap() > 0);
729 }
730
731 #[test]
732 fn disk_cleanup_stat() {
733 let (store, _dir) = tmp_store();
734
735 let blobs = rng_blobs(3);
736 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
737 store.insert_all(blobs).unwrap();
738
739 store.delete_all(txs).unwrap();
740 let stat = store.cleanup();
741 assert_eq!(stat.delete_succeed, 3);
742 assert_eq!(stat.delete_failed, 0);
743 }
744}