reth_transaction_pool/blobstore/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
//! Storage for blob data of EIP4844 transactions.
use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
use alloy_primitives::B256;
pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore};
pub use mem::InMemoryBlobStore;
pub use noop::NoopBlobStore;
use std::{
fmt,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
pub mod disk;
mod mem;
mod noop;
mod tracker;
/// A blob store that can be used to store blob data of EIP4844 transactions.
///
/// This type is responsible for keeping track of blob data until it is no longer needed (after
/// finalization).
///
/// Note: this is Clone because it is expected to be wrapped in an Arc.
pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
/// Inserts the blob sidecar into the store
fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError>;
/// Inserts multiple blob sidecars into the store
fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError>;
/// Deletes the blob sidecar from the store
fn delete(&self, tx: B256) -> Result<(), BlobStoreError>;
/// Deletes multiple blob sidecars from the store
fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError>;
/// A maintenance function that can be called periodically to clean up the blob store, returns
/// the number of successfully deleted blobs and the number of failed deletions.
///
/// This is intended to be called in the background to clean up any old or unused data, in case
/// the store uses deferred cleanup: [`DiskFileBlobStore`]
fn cleanup(&self) -> BlobStoreCleanupStat;
/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
/// Retrieves all decoded blob data for the given transaction hashes.
///
/// This only returns the blobs that were found in the store.
/// If there's no blob it will not be returned.
///
/// Note: this is not guaranteed to return the blobs in the same order as the input.
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
/// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact
/// order they were requested.
///
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>)
-> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
/// Data size of all transactions in the blob store.
fn data_size_hint(&self) -> Option<usize>;
/// How many blobs are in the blob store.
fn blobs_len(&self) -> usize;
}
/// Error variants that can occur when interacting with a blob store.
#[derive(Debug, thiserror::Error)]
pub enum BlobStoreError {
/// Thrown if the blob sidecar is not found for a given transaction hash but was required.
#[error("blob sidecar not found for transaction {0:?}")]
MissingSidecar(B256),
/// Failed to decode the stored blob data.
#[error("failed to decode blob data: {0}")]
DecodeError(#[from] alloy_rlp::Error),
/// Other implementation specific error.
#[error(transparent)]
Other(Box<dyn core::error::Error + Send + Sync>),
}
/// Keeps track of the size of the blob store.
#[derive(Debug, Default)]
pub(crate) struct BlobStoreSize {
data_size: AtomicUsize,
num_blobs: AtomicUsize,
}
impl BlobStoreSize {
#[inline]
pub(crate) fn add_size(&self, add: usize) {
self.data_size.fetch_add(add, Ordering::Relaxed);
}
#[inline]
pub(crate) fn sub_size(&self, sub: usize) {
let _ = self.data_size.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_sub(sub))
});
}
#[inline]
pub(crate) fn update_len(&self, len: usize) {
self.num_blobs.store(len, Ordering::Relaxed);
}
#[inline]
pub(crate) fn inc_len(&self, add: usize) {
self.num_blobs.fetch_add(add, Ordering::Relaxed);
}
#[inline]
pub(crate) fn sub_len(&self, sub: usize) {
let _ = self.num_blobs.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_sub(sub))
});
}
#[inline]
pub(crate) fn data_size(&self) -> usize {
self.data_size.load(Ordering::Relaxed)
}
#[inline]
pub(crate) fn blobs_len(&self) -> usize {
self.num_blobs.load(Ordering::Relaxed)
}
}
impl PartialEq for BlobStoreSize {
fn eq(&self, other: &Self) -> bool {
self.data_size.load(Ordering::Relaxed) == other.data_size.load(Ordering::Relaxed) &&
self.num_blobs.load(Ordering::Relaxed) == other.num_blobs.load(Ordering::Relaxed)
}
}
/// Statistics for the cleanup operation.
#[derive(Debug, Clone, Default)]
pub struct BlobStoreCleanupStat {
/// the number of successfully deleted blobs
pub delete_succeed: usize,
/// the number of failed deletions
pub delete_failed: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(dead_code)]
struct DynStore {
store: Box<dyn BlobStore>,
}
}