reth_transaction_pool/blobstore/
mem.rsuse crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
use alloy_primitives::B256;
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
#[derive(Clone, Debug, Default, PartialEq)]
pub struct InMemoryBlobStore {
inner: Arc<InMemoryBlobStoreInner>,
}
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
size_tracker: BlobStoreSize,
}
impl PartialEq for InMemoryBlobStoreInner {
fn eq(&self, other: &Self) -> bool {
self.store.read().eq(&other.store.read())
}
}
impl BlobStore for InMemoryBlobStore {
fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut store = self.inner.store.write();
self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
self.inner.size_tracker.update_len(store.len());
Ok(())
}
fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
if txs.is_empty() {
return Ok(())
}
let mut store = self.inner.store.write();
let mut total_add = 0;
for (tx, data) in txs {
let add = insert_size(&mut store, tx, data);
total_add += add;
}
self.inner.size_tracker.add_size(total_add);
self.inner.size_tracker.update_len(store.len());
Ok(())
}
fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
let mut store = self.inner.store.write();
let sub = remove_size(&mut store, &tx);
self.inner.size_tracker.sub_size(sub);
self.inner.size_tracker.update_len(store.len());
Ok(())
}
fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
if txs.is_empty() {
return Ok(())
}
let mut store = self.inner.store.write();
let mut total_sub = 0;
for tx in txs {
total_sub += remove_size(&mut store, &tx);
}
self.inner.size_tracker.sub_size(total_sub);
self.inner.size_tracker.update_len(store.len());
Ok(())
}
fn cleanup(&self) -> BlobStoreCleanupStat {
BlobStoreCleanupStat::default()
}
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(self.inner.store.read().get(&tx).cloned())
}
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
Ok(self.inner.store.read().contains_key(&tx))
}
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let store = self.inner.store.read();
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
}
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let store = self.inner.store.read();
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
}
fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
if blob_versioned_hash == *target_versioned_hash {
result[j].get_or_insert_with(|| BlobAndProofV1 {
blob: Box::new(blob_sidecar.blobs[i]),
proof: blob_sidecar.proofs[i],
});
}
}
}
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
Ok(result)
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
fn blobs_len(&self) -> usize {
self.inner.size_tracker.blobs_len()
}
}
#[inline]
fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}
#[inline]
fn insert_size(
store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
tx: B256,
blob: BlobTransactionSidecar,
) -> usize {
let add = blob.size();
store.insert(tx, Arc::new(blob));
add
}