reth_transaction_pool/blobstore/
mod.rs
1use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
4use alloy_primitives::B256;
5pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore};
6pub use mem::InMemoryBlobStore;
7pub use noop::NoopBlobStore;
8use std::{
9 fmt,
10 sync::{
11 atomic::{AtomicUsize, Ordering},
12 Arc,
13 },
14};
15pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
16
17pub mod disk;
18mod mem;
19mod noop;
20mod tracker;
21
22pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
29 fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError>;
31
32 fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError>;
34
35 fn delete(&self, tx: B256) -> Result<(), BlobStoreError>;
37
38 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError>;
40
41 fn cleanup(&self) -> BlobStoreCleanupStat;
47
48 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
50
51 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
53
54 fn get_all(
61 &self,
62 txs: Vec<B256>,
63 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
64
65 fn get_exact(&self, txs: Vec<B256>)
70 -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
71
72 fn get_by_versioned_hashes(
74 &self,
75 versioned_hashes: &[B256],
76 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
77
78 fn data_size_hint(&self) -> Option<usize>;
80
81 fn blobs_len(&self) -> usize;
83}
84
85#[derive(Debug, thiserror::Error)]
87pub enum BlobStoreError {
88 #[error("blob sidecar not found for transaction {0:?}")]
90 MissingSidecar(B256),
91 #[error("failed to decode blob data: {0}")]
93 DecodeError(#[from] alloy_rlp::Error),
94 #[error(transparent)]
96 Other(Box<dyn core::error::Error + Send + Sync>),
97}
98
99#[derive(Debug, Default)]
101pub(crate) struct BlobStoreSize {
102 data_size: AtomicUsize,
103 num_blobs: AtomicUsize,
104}
105
106impl BlobStoreSize {
107 #[inline]
108 pub(crate) fn add_size(&self, add: usize) {
109 self.data_size.fetch_add(add, Ordering::Relaxed);
110 }
111
112 #[inline]
113 pub(crate) fn sub_size(&self, sub: usize) {
114 let _ = self.data_size.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
115 Some(current.saturating_sub(sub))
116 });
117 }
118
119 #[inline]
120 pub(crate) fn update_len(&self, len: usize) {
121 self.num_blobs.store(len, Ordering::Relaxed);
122 }
123
124 #[inline]
125 pub(crate) fn inc_len(&self, add: usize) {
126 self.num_blobs.fetch_add(add, Ordering::Relaxed);
127 }
128
129 #[inline]
130 pub(crate) fn sub_len(&self, sub: usize) {
131 let _ = self.num_blobs.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
132 Some(current.saturating_sub(sub))
133 });
134 }
135
136 #[inline]
137 pub(crate) fn data_size(&self) -> usize {
138 self.data_size.load(Ordering::Relaxed)
139 }
140
141 #[inline]
142 pub(crate) fn blobs_len(&self) -> usize {
143 self.num_blobs.load(Ordering::Relaxed)
144 }
145}
146
147impl PartialEq for BlobStoreSize {
148 fn eq(&self, other: &Self) -> bool {
149 self.data_size.load(Ordering::Relaxed) == other.data_size.load(Ordering::Relaxed) &&
150 self.num_blobs.load(Ordering::Relaxed) == other.num_blobs.load(Ordering::Relaxed)
151 }
152}
153
154#[derive(Debug, Clone, Default, PartialEq, Eq)]
156pub struct BlobStoreCleanupStat {
157 pub delete_succeed: usize,
159 pub delete_failed: usize,
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[expect(dead_code)]
168 struct DynStore {
169 store: Box<dyn BlobStore>,
170 }
171}