reth_transaction_pool/blobstore/
mod.rs1use alloy_eips::{
4 eip4844::{BlobAndProofV1, BlobAndProofV2},
5 eip7594::BlobTransactionSidecarVariant,
6};
7use alloy_primitives::B256;
8pub use converter::BlobSidecarConverter;
9pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore};
10pub use mem::InMemoryBlobStore;
11pub use noop::NoopBlobStore;
12use std::{
13 fmt,
14 sync::{
15 atomic::{AtomicUsize, Ordering},
16 Arc,
17 },
18};
19pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
20
21mod converter;
22pub mod disk;
23mod mem;
24mod noop;
25mod tracker;
26
27pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
34 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError>;
36
37 fn insert_all(
39 &self,
40 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
41 ) -> Result<(), BlobStoreError>;
42
43 fn delete(&self, tx: B256) -> Result<(), BlobStoreError>;
45
46 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError>;
48
49 fn cleanup(&self) -> BlobStoreCleanupStat;
55
56 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
58
59 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
61
62 fn get_all(
69 &self,
70 txs: Vec<B256>,
71 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError>;
72
73 fn get_exact(
78 &self,
79 txs: Vec<B256>,
80 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
81
82 fn get_by_versioned_hashes_v1(
84 &self,
85 versioned_hashes: &[B256],
86 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
87
88 fn get_by_versioned_hashes_v2(
99 &self,
100 versioned_hashes: &[B256],
101 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
102
103 fn data_size_hint(&self) -> Option<usize>;
105
106 fn blobs_len(&self) -> usize;
108}
109
110#[derive(Debug, thiserror::Error)]
112pub enum BlobStoreError {
113 #[error("blob sidecar not found for transaction {0:?}")]
115 MissingSidecar(B256),
116 #[error("failed to decode blob data: {0}")]
118 DecodeError(#[from] alloy_rlp::Error),
119 #[error(transparent)]
121 Other(Box<dyn core::error::Error + Send + Sync>),
122}
123
124#[derive(Debug, Default)]
126pub(crate) struct BlobStoreSize {
127 data_size: AtomicUsize,
128 num_blobs: AtomicUsize,
129}
130
131impl BlobStoreSize {
132 #[inline]
133 pub(crate) fn add_size(&self, add: usize) {
134 self.data_size.fetch_add(add, Ordering::Relaxed);
135 }
136
137 #[inline]
138 pub(crate) fn sub_size(&self, sub: usize) {
139 let _ = self.data_size.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
140 Some(current.saturating_sub(sub))
141 });
142 }
143
144 #[inline]
145 pub(crate) fn update_len(&self, len: usize) {
146 self.num_blobs.store(len, Ordering::Relaxed);
147 }
148
149 #[inline]
150 pub(crate) fn inc_len(&self, add: usize) {
151 self.num_blobs.fetch_add(add, Ordering::Relaxed);
152 }
153
154 #[inline]
155 pub(crate) fn sub_len(&self, sub: usize) {
156 let _ = self.num_blobs.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
157 Some(current.saturating_sub(sub))
158 });
159 }
160
161 #[inline]
162 pub(crate) fn data_size(&self) -> usize {
163 self.data_size.load(Ordering::Relaxed)
164 }
165
166 #[inline]
167 pub(crate) fn blobs_len(&self) -> usize {
168 self.num_blobs.load(Ordering::Relaxed)
169 }
170}
171
172impl PartialEq for BlobStoreSize {
173 fn eq(&self, other: &Self) -> bool {
174 self.data_size.load(Ordering::Relaxed) == other.data_size.load(Ordering::Relaxed) &&
175 self.num_blobs.load(Ordering::Relaxed) == other.num_blobs.load(Ordering::Relaxed)
176 }
177}
178
179#[derive(Debug, Clone, Default, PartialEq, Eq)]
181pub struct BlobStoreCleanupStat {
182 pub delete_succeed: usize,
184 pub delete_failed: usize,
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[expect(dead_code)]
193 struct DynStore {
194 store: Box<dyn BlobStore>,
195 }
196}