reth_transaction_pool/blobstore/
mod.rs1use alloy_eips::{
4 eip4844::{BlobAndProofV1, BlobAndProofV2},
5 eip7594::BlobTransactionSidecarVariant,
6};
7use alloy_primitives::B256;
8pub use converter::BlobSidecarConverter;
9pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore};
10pub use mem::InMemoryBlobStore;
11pub use noop::NoopBlobStore;
12use std::{
13 fmt,
14 sync::{
15 atomic::{AtomicUsize, Ordering},
16 Arc,
17 },
18};
19pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
20
21mod converter;
22pub mod disk;
23mod mem;
24mod noop;
25mod tracker;
26
27pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
34 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError>;
36
37 fn insert_all(
39 &self,
40 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
41 ) -> Result<(), BlobStoreError>;
42
43 fn delete(&self, tx: B256) -> Result<(), BlobStoreError>;
45
46 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError>;
48
49 fn cleanup(&self) -> BlobStoreCleanupStat;
55
56 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
58
59 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
61
62 fn get_all(
69 &self,
70 txs: Vec<B256>,
71 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError>;
72
73 fn get_exact(
78 &self,
79 txs: Vec<B256>,
80 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
81
82 fn get_by_versioned_hashes_v1(
84 &self,
85 versioned_hashes: &[B256],
86 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
87
88 fn get_by_versioned_hashes_v2(
99 &self,
100 versioned_hashes: &[B256],
101 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
102
103 fn get_by_versioned_hashes_v3(
108 &self,
109 versioned_hashes: &[B256],
110 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError>;
111
112 fn data_size_hint(&self) -> Option<usize>;
114
115 fn blobs_len(&self) -> usize;
117}
118
119#[derive(Debug, thiserror::Error)]
121pub enum BlobStoreError {
122 #[error("blob sidecar not found for transaction {0:?}")]
124 MissingSidecar(B256),
125 #[error("failed to decode blob data: {0}")]
127 DecodeError(#[from] alloy_rlp::Error),
128 #[error(transparent)]
130 Other(Box<dyn core::error::Error + Send + Sync>),
131}
132
133#[derive(Debug, Default)]
135pub(crate) struct BlobStoreSize {
136 data_size: AtomicUsize,
137 num_blobs: AtomicUsize,
138}
139
140impl BlobStoreSize {
141 #[inline]
142 pub(crate) fn add_size(&self, add: usize) {
143 self.data_size.fetch_add(add, Ordering::Relaxed);
144 }
145
146 #[inline]
147 pub(crate) fn sub_size(&self, sub: usize) {
148 let _ = self.data_size.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
149 Some(current.saturating_sub(sub))
150 });
151 }
152
153 #[inline]
154 pub(crate) fn update_len(&self, len: usize) {
155 self.num_blobs.store(len, Ordering::Relaxed);
156 }
157
158 #[inline]
159 pub(crate) fn inc_len(&self, add: usize) {
160 self.num_blobs.fetch_add(add, Ordering::Relaxed);
161 }
162
163 #[inline]
164 pub(crate) fn sub_len(&self, sub: usize) {
165 let _ = self.num_blobs.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
166 Some(current.saturating_sub(sub))
167 });
168 }
169
170 #[inline]
171 pub(crate) fn data_size(&self) -> usize {
172 self.data_size.load(Ordering::Relaxed)
173 }
174
175 #[inline]
176 pub(crate) fn blobs_len(&self) -> usize {
177 self.num_blobs.load(Ordering::Relaxed)
178 }
179}
180
181impl PartialEq for BlobStoreSize {
182 fn eq(&self, other: &Self) -> bool {
183 self.data_size.load(Ordering::Relaxed) == other.data_size.load(Ordering::Relaxed) &&
184 self.num_blobs.load(Ordering::Relaxed) == other.num_blobs.load(Ordering::Relaxed)
185 }
186}
187
188#[derive(Debug, Clone, Default, PartialEq, Eq)]
190pub struct BlobStoreCleanupStat {
191 pub delete_succeed: usize,
193 pub delete_failed: usize,
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[expect(dead_code)]
202 struct DynStore {
203 store: Box<dyn BlobStore>,
204 }
205}