reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
3use alloy_primitives::B256;
4use parking_lot::RwLock;
5use std::{collections::HashMap, sync::Arc};
6
7/// An in-memory blob store.
8#[derive(Clone, Debug, Default, PartialEq)]
9pub struct InMemoryBlobStore {
10    inner: Arc<InMemoryBlobStoreInner>,
11}
12
13#[derive(Debug, Default)]
14struct InMemoryBlobStoreInner {
15    /// Storage for all blob data.
16    store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
17    size_tracker: BlobStoreSize,
18}
19
20impl PartialEq for InMemoryBlobStoreInner {
21    fn eq(&self, other: &Self) -> bool {
22        self.store.read().eq(&other.store.read())
23    }
24}
25
26impl BlobStore for InMemoryBlobStore {
27    fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
28        let mut store = self.inner.store.write();
29        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
30        self.inner.size_tracker.update_len(store.len());
31        Ok(())
32    }
33
34    fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
35        if txs.is_empty() {
36            return Ok(())
37        }
38        let mut store = self.inner.store.write();
39        let mut total_add = 0;
40        for (tx, data) in txs {
41            let add = insert_size(&mut store, tx, data);
42            total_add += add;
43        }
44        self.inner.size_tracker.add_size(total_add);
45        self.inner.size_tracker.update_len(store.len());
46        Ok(())
47    }
48
49    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
50        let mut store = self.inner.store.write();
51        let sub = remove_size(&mut store, &tx);
52        self.inner.size_tracker.sub_size(sub);
53        self.inner.size_tracker.update_len(store.len());
54        Ok(())
55    }
56
57    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
58        if txs.is_empty() {
59            return Ok(())
60        }
61        let mut store = self.inner.store.write();
62        let mut total_sub = 0;
63        for tx in txs {
64            total_sub += remove_size(&mut store, &tx);
65        }
66        self.inner.size_tracker.sub_size(total_sub);
67        self.inner.size_tracker.update_len(store.len());
68        Ok(())
69    }
70
71    fn cleanup(&self) -> BlobStoreCleanupStat {
72        BlobStoreCleanupStat::default()
73    }
74
75    // Retrieves the decoded blob data for the given transaction hash.
76    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
77        Ok(self.inner.store.read().get(&tx).cloned())
78    }
79
80    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
81        Ok(self.inner.store.read().contains_key(&tx))
82    }
83
84    fn get_all(
85        &self,
86        txs: Vec<B256>,
87    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
88        let store = self.inner.store.read();
89        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
90    }
91
92    fn get_exact(
93        &self,
94        txs: Vec<B256>,
95    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
96        let store = self.inner.store.read();
97        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
98    }
99
100    fn get_by_versioned_hashes(
101        &self,
102        versioned_hashes: &[B256],
103    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
104        let mut result = vec![None; versioned_hashes.len()];
105        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
106            for (hash_idx, match_result) in blob_sidecar.match_versioned_hashes(versioned_hashes) {
107                result[hash_idx] = Some(match_result);
108            }
109
110            // Return early if all blobs are found.
111            if result.iter().all(|blob| blob.is_some()) {
112                break;
113            }
114        }
115        Ok(result)
116    }
117
118    fn data_size_hint(&self) -> Option<usize> {
119        Some(self.inner.size_tracker.data_size())
120    }
121
122    fn blobs_len(&self) -> usize {
123        self.inner.size_tracker.blobs_len()
124    }
125}
126
127/// Removes the given blob from the store and returns the size of the blob that was removed.
128#[inline]
129fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
130    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
131}
132
133/// Inserts the given blob into the store and returns the size of the blob that was added.
134///
135/// We don't need to handle the size updates for replacements because transactions are unique.
136#[inline]
137fn insert_size(
138    store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
139    tx: B256,
140    blob: BlobTransactionSidecar,
141) -> usize {
142    let add = blob.size();
143    store.insert(tx, Arc::new(blob));
144    add
145}