1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
5use alloy_primitives::{TxHash, B256};
6use parking_lot::{Mutex, RwLock};
7use schnellru::{ByLength, LruMap};
8use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
9use tracing::{debug, trace};
10
11pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
13
14#[derive(Clone, Debug)]
20pub struct DiskFileBlobStore {
21 inner: Arc<DiskFileBlobStoreInner>,
22}
23
24impl DiskFileBlobStore {
25 pub fn open(
27 blob_dir: impl Into<PathBuf>,
28 opts: DiskFileBlobStoreConfig,
29 ) -> Result<Self, DiskFileBlobStoreError> {
30 let blob_dir = blob_dir.into();
31 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
32 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
33
34 inner.delete_all()?;
36 inner.create_blob_dir()?;
37
38 Ok(Self { inner: Arc::new(inner) })
39 }
40
41 #[cfg(test)]
42 fn is_cached(&self, tx: &B256) -> bool {
43 self.inner.blob_cache.lock().get(tx).is_some()
44 }
45
46 #[cfg(test)]
47 fn clear_cache(&self) {
48 self.inner.blob_cache.lock().clear()
49 }
50}
51
52impl BlobStore for DiskFileBlobStore {
53 fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
54 self.inner.insert_one(tx, data)
55 }
56
57 fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
58 if txs.is_empty() {
59 return Ok(())
60 }
61 self.inner.insert_many(txs)
62 }
63
64 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
65 if self.inner.contains(tx)? {
66 self.inner.txs_to_delete.write().insert(tx);
67 }
68 Ok(())
69 }
70
71 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
72 let txs = self.inner.retain_existing(txs)?;
73 self.inner.txs_to_delete.write().extend(txs);
74 Ok(())
75 }
76
77 fn cleanup(&self) -> BlobStoreCleanupStat {
78 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
79 let mut stat = BlobStoreCleanupStat::default();
80 let mut subsize = 0;
81 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
82 for tx in txs_to_delete {
83 let path = self.inner.blob_disk_file(tx);
84 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
85 match fs::remove_file(&path) {
86 Ok(_) => {
87 stat.delete_succeed += 1;
88 subsize += filesize;
89 }
90 Err(e) => {
91 stat.delete_failed += 1;
92 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
93 debug!(target:"txpool::blob", %err);
94 }
95 };
96 }
97 self.inner.size_tracker.sub_size(subsize as usize);
98 self.inner.size_tracker.sub_len(stat.delete_succeed);
99 stat
100 }
101
102 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
103 self.inner.get_one(tx)
104 }
105
106 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
107 self.inner.contains(tx)
108 }
109
110 fn get_all(
111 &self,
112 txs: Vec<B256>,
113 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
114 if txs.is_empty() {
115 return Ok(Vec::new())
116 }
117 self.inner.get_all(txs)
118 }
119
120 fn get_exact(
121 &self,
122 txs: Vec<B256>,
123 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
124 if txs.is_empty() {
125 return Ok(Vec::new())
126 }
127 self.inner.get_exact(txs)
128 }
129
130 fn get_by_versioned_hashes(
131 &self,
132 versioned_hashes: &[B256],
133 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
134 let mut result = vec![None; versioned_hashes.len()];
135 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
136 for (hash_idx, match_result) in blob_sidecar.match_versioned_hashes(versioned_hashes) {
137 result[hash_idx] = Some(match_result);
138 }
139 if result.iter().all(|blob| blob.is_some()) {
141 break;
142 }
143 }
144 Ok(result)
145 }
146
147 fn data_size_hint(&self) -> Option<usize> {
148 Some(self.inner.size_tracker.data_size())
149 }
150
151 fn blobs_len(&self) -> usize {
152 self.inner.size_tracker.blobs_len()
153 }
154}
155
156struct DiskFileBlobStoreInner {
157 blob_dir: PathBuf,
158 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
159 size_tracker: BlobStoreSize,
160 file_lock: RwLock<()>,
161 txs_to_delete: RwLock<HashSet<B256>>,
162}
163
164impl DiskFileBlobStoreInner {
165 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
167 Self {
168 blob_dir,
169 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
170 size_tracker: Default::default(),
171 file_lock: Default::default(),
172 txs_to_delete: Default::default(),
173 }
174 }
175
176 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
178 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
179 fs::create_dir_all(&self.blob_dir)
180 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
181 }
182
183 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
185 match fs::remove_dir_all(&self.blob_dir) {
186 Ok(_) => {
187 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
188 }
189 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
190 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
191 }
192 Ok(())
193 }
194
195 fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
197 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
198 data.rlp_encode_fields(&mut buf);
199 self.blob_cache.lock().insert(tx, Arc::new(data));
200 let size = self.write_one_encoded(tx, &buf)?;
201
202 self.size_tracker.add_size(size);
203 self.size_tracker.inc_len(1);
204 Ok(())
205 }
206
207 fn insert_many(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
209 let raw = txs
210 .iter()
211 .map(|(tx, data)| {
212 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
213 data.rlp_encode_fields(&mut buf);
214 (self.blob_disk_file(*tx), buf)
215 })
216 .collect::<Vec<_>>();
217
218 {
219 let mut cache = self.blob_cache.lock();
220 for (tx, data) in txs {
221 cache.insert(tx, Arc::new(data));
222 }
223 }
224 let mut add = 0;
225 let mut num = 0;
226 {
227 let _lock = self.file_lock.write();
228 for (path, data) in raw {
229 if path.exists() {
230 debug!(target:"txpool::blob", ?path, "Blob already exists");
231 } else if let Err(err) = fs::write(&path, &data) {
232 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
233 } else {
234 add += data.len();
235 num += 1;
236 }
237 }
238 }
239 self.size_tracker.add_size(add);
240 self.size_tracker.inc_len(num);
241
242 Ok(())
243 }
244
245 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
247 if self.blob_cache.lock().get(&tx).is_some() {
248 return Ok(true)
249 }
250 Ok(self.blob_disk_file(tx).is_file())
252 }
253
254 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
256 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
257 let mut cache = self.blob_cache.lock();
258 txs.into_iter().partition(|tx| cache.get(tx).is_some())
259 };
260
261 let mut existing = in_cache;
262 for tx in not_in_cache {
263 if self.blob_disk_file(tx).is_file() {
264 existing.push(tx);
265 }
266 }
267
268 Ok(existing)
269 }
270
271 fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
273 if let Some(blob) = self.blob_cache.lock().get(&tx) {
274 return Ok(Some(blob.clone()))
275 }
276 let blob = self.read_one(tx)?;
277
278 if let Some(blob) = &blob {
279 let blob_arc = Arc::new(blob.clone());
280 self.blob_cache.lock().insert(tx, blob_arc.clone());
281 return Ok(Some(blob_arc))
282 }
283
284 Ok(None)
285 }
286
287 #[inline]
289 fn blob_disk_file(&self, tx: B256) -> PathBuf {
290 self.blob_dir.join(format!("{tx:x}"))
291 }
292
293 #[inline]
295 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
296 let path = self.blob_disk_file(tx);
297 let data = {
298 let _lock = self.file_lock.read();
299 match fs::read(&path) {
300 Ok(data) => data,
301 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
302 Err(e) => {
303 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
304 tx, path, e,
305 ))))
306 }
307 }
308 };
309 BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
310 .map(Some)
311 .map_err(BlobStoreError::DecodeError)
312 }
313
314 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecar)> {
316 self.read_many_raw(txs)
317 .into_iter()
318 .filter_map(|(tx, data)| {
319 BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
320 .map(|sidecar| (tx, sidecar))
321 .ok()
322 })
323 .collect()
324 }
325
326 #[inline]
330 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
331 let mut res = Vec::with_capacity(txs.len());
332 let _lock = self.file_lock.read();
333 for tx in txs {
334 let path = self.blob_disk_file(tx);
335 match fs::read(&path) {
336 Ok(data) => {
337 res.push((tx, data));
338 }
339 Err(err) => {
340 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
341 }
342 };
343 }
344 res
345 }
346
347 #[inline]
349 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
350 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
351 let mut add = 0;
352 let path = self.blob_disk_file(tx);
353 {
354 let _lock = self.file_lock.write();
355 if !path.exists() {
356 fs::write(&path, data)
357 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
358 add = data.len();
359 }
360 }
361 Ok(add)
362 }
363
364 #[inline]
369 fn get_all(
370 &self,
371 txs: Vec<B256>,
372 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
373 let mut res = Vec::with_capacity(txs.len());
374 let mut cache_miss = Vec::new();
375 {
376 let mut cache = self.blob_cache.lock();
377 for tx in txs {
378 if let Some(blob) = cache.get(&tx) {
379 res.push((tx, blob.clone()));
380 } else {
381 cache_miss.push(tx)
382 }
383 }
384 }
385 if cache_miss.is_empty() {
386 return Ok(res)
387 }
388 let from_disk = self.read_many_decoded(cache_miss);
389 if from_disk.is_empty() {
390 return Ok(res)
391 }
392 let mut cache = self.blob_cache.lock();
393 for (tx, data) in from_disk {
394 let arc = Arc::new(data.clone());
395 cache.insert(tx, arc.clone());
396 res.push((tx, arc.clone()));
397 }
398
399 Ok(res)
400 }
401
402 #[inline]
406 fn get_exact(
407 &self,
408 txs: Vec<B256>,
409 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
410 txs.into_iter()
411 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
412 .collect()
413 }
414}
415
416impl fmt::Debug for DiskFileBlobStoreInner {
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 f.debug_struct("DiskFileBlobStoreInner")
419 .field("blob_dir", &self.blob_dir)
420 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
421 .field("txs_to_delete", &self.txs_to_delete.try_read())
422 .finish()
423 }
424}
425
426#[derive(Debug, thiserror::Error)]
428pub enum DiskFileBlobStoreError {
429 #[error("failed to open blobstore at {0}: {1}")]
431 Open(PathBuf, io::Error),
433 #[error("[{0}] failed to read blob file at {1}: {2}")]
435 ReadFile(TxHash, PathBuf, io::Error),
437 #[error("[{0}] failed to write blob file at {1}: {2}")]
439 WriteFile(TxHash, PathBuf, io::Error),
441 #[error("[{0}] failed to delete blob file at {1}: {2}")]
443 DeleteFile(TxHash, PathBuf, io::Error),
445}
446
447impl From<DiskFileBlobStoreError> for BlobStoreError {
448 fn from(value: DiskFileBlobStoreError) -> Self {
449 Self::Other(Box::new(value))
450 }
451}
452
453#[derive(Debug, Clone)]
455pub struct DiskFileBlobStoreConfig {
456 pub max_cached_entries: u32,
458 pub open: OpenDiskFileBlobStore,
460}
461
462impl Default for DiskFileBlobStoreConfig {
463 fn default() -> Self {
464 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
465 }
466}
467
468impl DiskFileBlobStoreConfig {
469 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
471 self.max_cached_entries = max_cached_entries;
472 self
473 }
474}
475
476#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
478pub enum OpenDiskFileBlobStore {
479 #[default]
481 Clear,
482 ReIndex,
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use std::sync::atomic::Ordering;
490
491 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
492 let dir = tempfile::tempdir().unwrap();
493 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
494 (store, dir)
495 }
496
497 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecar)> {
498 let mut rng = rand::thread_rng();
499 (0..num)
500 .map(|_| {
501 let tx = TxHash::random_with(&mut rng);
502 let blob =
503 BlobTransactionSidecar { blobs: vec![], commitments: vec![], proofs: vec![] };
504 (tx, blob)
505 })
506 .collect()
507 }
508
509 #[test]
510 fn disk_insert_all_get_all() {
511 let (store, _dir) = tmp_store();
512
513 let blobs = rng_blobs(10);
514 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
515 store.insert_all(blobs.clone()).unwrap();
516
517 for (tx, blob) in &blobs {
519 assert!(store.is_cached(tx));
520 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
521 assert_eq!(b, *blob);
522 }
523
524 let all = store.get_all(all_hashes.clone()).unwrap();
525 for (tx, blob) in all {
526 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
527 }
528
529 assert!(store.contains(all_hashes[0]).unwrap());
530 store.delete_all(all_hashes.clone()).unwrap();
531 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
532 store.clear_cache();
533 store.cleanup();
534
535 assert!(store.get(blobs[0].0).unwrap().is_none());
536
537 let all = store.get_all(all_hashes.clone()).unwrap();
538 assert!(all.is_empty());
539
540 assert!(!store.contains(all_hashes[0]).unwrap());
541 assert!(store.get_exact(all_hashes).is_err());
542
543 assert_eq!(store.data_size_hint(), Some(0));
544 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
545 }
546
547 #[test]
548 fn disk_insert_and_retrieve() {
549 let (store, _dir) = tmp_store();
550
551 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
552 store.insert(tx, blob.clone()).unwrap();
553
554 assert!(store.is_cached(&tx));
555 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
556 assert_eq!(retrieved_blob, blob);
557 }
558
559 #[test]
560 fn disk_delete_blob() {
561 let (store, _dir) = tmp_store();
562
563 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
564 store.insert(tx, blob).unwrap();
565 assert!(store.is_cached(&tx));
566
567 store.delete(tx).unwrap();
568 assert!(store.inner.txs_to_delete.read().contains(&tx));
569 store.cleanup();
570
571 let result = store.get(tx).unwrap();
572 assert_eq!(
573 result,
574 Some(Arc::new(BlobTransactionSidecar {
575 blobs: vec![],
576 commitments: vec![],
577 proofs: vec![]
578 }))
579 );
580 }
581
582 #[test]
583 fn disk_insert_all_and_delete_all() {
584 let (store, _dir) = tmp_store();
585
586 let blobs = rng_blobs(5);
587 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
588 store.insert_all(blobs.clone()).unwrap();
589
590 for (tx, _) in &blobs {
591 assert!(store.is_cached(tx));
592 }
593
594 store.delete_all(txs.clone()).unwrap();
595 store.cleanup();
596
597 for tx in txs {
598 let result = store.get(tx).unwrap();
599 assert_eq!(
600 result,
601 Some(Arc::new(BlobTransactionSidecar {
602 blobs: vec![],
603 commitments: vec![],
604 proofs: vec![]
605 }))
606 );
607 }
608 }
609
610 #[test]
611 fn disk_get_all_blobs() {
612 let (store, _dir) = tmp_store();
613
614 let blobs = rng_blobs(3);
615 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
616 store.insert_all(blobs.clone()).unwrap();
617
618 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
619 for (tx, blob) in retrieved_blobs {
620 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
621 }
622
623 store.delete_all(txs).unwrap();
624 store.cleanup();
625 }
626
627 #[test]
628 fn disk_get_exact_blobs_success() {
629 let (store, _dir) = tmp_store();
630
631 let blobs = rng_blobs(3);
632 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
633 store.insert_all(blobs.clone()).unwrap();
634
635 let retrieved_blobs = store.get_exact(txs).unwrap();
636 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
637 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
638 }
639 }
640
641 #[test]
642 fn disk_get_exact_blobs_failure() {
643 let (store, _dir) = tmp_store();
644
645 let blobs = rng_blobs(2);
646 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
647 store.insert_all(blobs).unwrap();
648
649 let missing_tx = TxHash::random();
651 let result = store.get_exact(vec![txs[0], missing_tx]);
652 assert!(result.is_err());
653 }
654
655 #[test]
656 fn disk_data_size_hint() {
657 let (store, _dir) = tmp_store();
658 assert_eq!(store.data_size_hint(), Some(0));
659
660 let blobs = rng_blobs(2);
661 store.insert_all(blobs).unwrap();
662 assert!(store.data_size_hint().unwrap() > 0);
663 }
664
665 #[test]
666 fn disk_cleanup_stat() {
667 let (store, _dir) = tmp_store();
668
669 let blobs = rng_blobs(3);
670 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
671 store.insert_all(blobs).unwrap();
672
673 store.delete_all(txs).unwrap();
674 let stat = store.cleanup();
675 assert_eq!(stat.delete_succeed, 3);
676 assert_eq!(stat.delete_failed, 0);
677 }
678}