reth_storage_api/
database_provider.rs

1use alloc::vec::Vec;
2use core::ops::{Bound, RangeBounds};
3use reth_db_api::{
4    common::KeyValue,
5    cursor::DbCursorRO,
6    database::Database,
7    table::Table,
8    transaction::{DbTx, DbTxMut},
9    DatabaseError,
10};
11use reth_prune_types::PruneModes;
12use reth_storage_errors::provider::ProviderResult;
13
14/// Database provider.
15pub trait DBProvider: Sized {
16    /// Underlying database transaction held by the provider.
17    type Tx: DbTx;
18
19    /// Returns a reference to the underlying transaction.
20    fn tx_ref(&self) -> &Self::Tx;
21
22    /// Returns a mutable reference to the underlying transaction.
23    fn tx_mut(&mut self) -> &mut Self::Tx;
24
25    /// Consumes the provider and returns the underlying transaction.
26    fn into_tx(self) -> Self::Tx;
27
28    /// Disables long-lived read transaction safety guarantees for leaks prevention and
29    /// observability improvements.
30    ///
31    /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions
32    /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning
33    /// that Reth as a node is offline and not progressing.
34    fn disable_long_read_transaction_safety(mut self) -> Self {
35        self.tx_mut().disable_long_read_transaction_safety();
36        self
37    }
38
39    /// Commit database transaction
40    fn commit(self) -> ProviderResult<bool> {
41        Ok(self.into_tx().commit()?)
42    }
43
44    /// Returns a reference to prune modes.
45    fn prune_modes_ref(&self) -> &PruneModes;
46
47    /// Return full table as Vec
48    fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DatabaseError>
49    where
50        T::Key: Default + Ord,
51    {
52        self.tx_ref()
53            .cursor_read::<T>()?
54            .walk(Some(T::Key::default()))?
55            .collect::<Result<Vec<_>, DatabaseError>>()
56    }
57
58    /// Return a list of entries from the table, based on the given range.
59    #[inline]
60    fn get<T: Table>(
61        &self,
62        range: impl RangeBounds<T::Key>,
63    ) -> Result<Vec<KeyValue<T>>, DatabaseError> {
64        self.tx_ref().cursor_read::<T>()?.walk_range(range)?.collect::<Result<Vec<_>, _>>()
65    }
66
67    /// Iterates over read only values in the given table and collects them into a vector.
68    ///
69    /// Early-returns if the range is empty, without opening a cursor transaction.
70    fn cursor_read_collect<T: Table<Key = u64>>(
71        &self,
72        range: impl RangeBounds<T::Key>,
73    ) -> ProviderResult<Vec<T::Value>> {
74        let capacity = match range_size_hint(&range) {
75            Some(0) | None => return Ok(Vec::new()),
76            Some(capacity) => capacity,
77        };
78        let mut cursor = self.tx_ref().cursor_read::<T>()?;
79        self.cursor_collect_with_capacity(&mut cursor, range, capacity)
80    }
81
82    /// Iterates over read only values in the given table and collects them into a vector.
83    fn cursor_collect<T: Table<Key = u64>>(
84        &self,
85        cursor: &mut impl DbCursorRO<T>,
86        range: impl RangeBounds<T::Key>,
87    ) -> ProviderResult<Vec<T::Value>> {
88        let capacity = range_size_hint(&range).unwrap_or(0);
89        self.cursor_collect_with_capacity(cursor, range, capacity)
90    }
91
92    /// Iterates over read only values in the given table and collects them into a vector with
93    /// capacity.
94    fn cursor_collect_with_capacity<T: Table<Key = u64>>(
95        &self,
96        cursor: &mut impl DbCursorRO<T>,
97        range: impl RangeBounds<T::Key>,
98        capacity: usize,
99    ) -> ProviderResult<Vec<T::Value>> {
100        let mut items = Vec::with_capacity(capacity);
101        for entry in cursor.walk_range(range)? {
102            items.push(entry?.1);
103        }
104        Ok(items)
105    }
106
107    /// Remove list of entries from the table. Returns the number of entries removed.
108    #[inline]
109    fn remove<T: Table>(&self, range: impl RangeBounds<T::Key>) -> Result<usize, DatabaseError>
110    where
111        Self::Tx: DbTxMut,
112    {
113        let mut entries = 0;
114        let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
115        let mut walker = cursor_write.walk_range(range)?;
116        while walker.next().transpose()?.is_some() {
117            walker.delete_current()?;
118            entries += 1;
119        }
120        Ok(entries)
121    }
122
123    /// Return a list of entries from the table, and remove them, based on the given range.
124    #[inline]
125    fn take<T: Table>(
126        &self,
127        range: impl RangeBounds<T::Key>,
128    ) -> Result<Vec<KeyValue<T>>, DatabaseError>
129    where
130        Self::Tx: DbTxMut,
131    {
132        let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
133        let mut walker = cursor_write.walk_range(range)?;
134        let mut items = Vec::new();
135        while let Some(i) = walker.next().transpose()? {
136            walker.delete_current()?;
137            items.push(i)
138        }
139        Ok(items)
140    }
141}
142
143/// Database provider factory.
144#[auto_impl::auto_impl(&, Arc)]
145pub trait DatabaseProviderFactory: Send + Sync {
146    /// Database this factory produces providers for.
147    type DB: Database;
148
149    /// Provider type returned by the factory.
150    type Provider: DBProvider<Tx = <Self::DB as Database>::TX>;
151
152    /// Read-write provider type returned by the factory.
153    type ProviderRW: DBProvider<Tx = <Self::DB as Database>::TXMut>;
154
155    /// Create new read-only database provider.
156    fn database_provider_ro(&self) -> ProviderResult<Self::Provider>;
157
158    /// Create new read-write database provider.
159    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW>;
160}
161
162/// Helper type alias to get the associated transaction type from a [`DatabaseProviderFactory`].
163pub type FactoryTx<F> = <<F as DatabaseProviderFactory>::DB as Database>::TX;
164
165fn range_size_hint(range: &impl RangeBounds<u64>) -> Option<usize> {
166    let start = match range.start_bound().cloned() {
167        Bound::Included(start) => start,
168        Bound::Excluded(start) => start.checked_add(1)?,
169        Bound::Unbounded => 0,
170    };
171    let end = match range.end_bound().cloned() {
172        Bound::Included(end) => end.saturating_add(1),
173        Bound::Excluded(end) => end,
174        Bound::Unbounded => return None,
175    };
176    end.checked_sub(start).map(|x| x as _)
177}