reth_storage_api/
database_provider.rs
1use alloc::vec::Vec;
2use core::ops::{Bound, RangeBounds};
3use reth_db_api::{
4 common::KeyValue,
5 cursor::DbCursorRO,
6 database::Database,
7 table::Table,
8 transaction::{DbTx, DbTxMut},
9 DatabaseError,
10};
11use reth_prune_types::PruneModes;
12use reth_storage_errors::provider::ProviderResult;
13
14pub trait DBProvider: Sized {
16 type Tx: DbTx;
18
19 fn tx_ref(&self) -> &Self::Tx;
21
22 fn tx_mut(&mut self) -> &mut Self::Tx;
24
25 fn into_tx(self) -> Self::Tx;
27
28 fn disable_long_read_transaction_safety(mut self) -> Self {
35 self.tx_mut().disable_long_read_transaction_safety();
36 self
37 }
38
39 fn commit(self) -> ProviderResult<bool> {
41 Ok(self.into_tx().commit()?)
42 }
43
44 fn prune_modes_ref(&self) -> &PruneModes;
46
47 fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DatabaseError>
49 where
50 T::Key: Default + Ord,
51 {
52 self.tx_ref()
53 .cursor_read::<T>()?
54 .walk(Some(T::Key::default()))?
55 .collect::<Result<Vec<_>, DatabaseError>>()
56 }
57
58 #[inline]
60 fn get<T: Table>(
61 &self,
62 range: impl RangeBounds<T::Key>,
63 ) -> Result<Vec<KeyValue<T>>, DatabaseError> {
64 self.tx_ref().cursor_read::<T>()?.walk_range(range)?.collect::<Result<Vec<_>, _>>()
65 }
66
67 fn cursor_read_collect<T: Table<Key = u64>>(
71 &self,
72 range: impl RangeBounds<T::Key>,
73 ) -> ProviderResult<Vec<T::Value>> {
74 let capacity = match range_size_hint(&range) {
75 Some(0) | None => return Ok(Vec::new()),
76 Some(capacity) => capacity,
77 };
78 let mut cursor = self.tx_ref().cursor_read::<T>()?;
79 self.cursor_collect_with_capacity(&mut cursor, range, capacity)
80 }
81
82 fn cursor_collect<T: Table<Key = u64>>(
84 &self,
85 cursor: &mut impl DbCursorRO<T>,
86 range: impl RangeBounds<T::Key>,
87 ) -> ProviderResult<Vec<T::Value>> {
88 let capacity = range_size_hint(&range).unwrap_or(0);
89 self.cursor_collect_with_capacity(cursor, range, capacity)
90 }
91
92 fn cursor_collect_with_capacity<T: Table<Key = u64>>(
95 &self,
96 cursor: &mut impl DbCursorRO<T>,
97 range: impl RangeBounds<T::Key>,
98 capacity: usize,
99 ) -> ProviderResult<Vec<T::Value>> {
100 let mut items = Vec::with_capacity(capacity);
101 for entry in cursor.walk_range(range)? {
102 items.push(entry?.1);
103 }
104 Ok(items)
105 }
106
107 #[inline]
109 fn remove<T: Table>(&self, range: impl RangeBounds<T::Key>) -> Result<usize, DatabaseError>
110 where
111 Self::Tx: DbTxMut,
112 {
113 let mut entries = 0;
114 let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
115 let mut walker = cursor_write.walk_range(range)?;
116 while walker.next().transpose()?.is_some() {
117 walker.delete_current()?;
118 entries += 1;
119 }
120 Ok(entries)
121 }
122
123 #[inline]
125 fn take<T: Table>(
126 &self,
127 range: impl RangeBounds<T::Key>,
128 ) -> Result<Vec<KeyValue<T>>, DatabaseError>
129 where
130 Self::Tx: DbTxMut,
131 {
132 let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
133 let mut walker = cursor_write.walk_range(range)?;
134 let mut items = Vec::new();
135 while let Some(i) = walker.next().transpose()? {
136 walker.delete_current()?;
137 items.push(i)
138 }
139 Ok(items)
140 }
141}
142
143#[auto_impl::auto_impl(&, Arc)]
145pub trait DatabaseProviderFactory: Send + Sync {
146 type DB: Database;
148
149 type Provider: DBProvider<Tx = <Self::DB as Database>::TX>;
151
152 type ProviderRW: DBProvider<Tx = <Self::DB as Database>::TXMut>;
154
155 fn database_provider_ro(&self) -> ProviderResult<Self::Provider>;
157
158 fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW>;
160}
161
162pub type FactoryTx<F> = <<F as DatabaseProviderFactory>::DB as Database>::TX;
164
165fn range_size_hint(range: &impl RangeBounds<u64>) -> Option<usize> {
166 let start = match range.start_bound().cloned() {
167 Bound::Included(start) => start,
168 Bound::Excluded(start) => start.checked_add(1)?,
169 Bound::Unbounded => 0,
170 };
171 let end = match range.end_bound().cloned() {
172 Bound::Included(end) => end.saturating_add(1),
173 Bound::Excluded(end) => end,
174 Bound::Unbounded => return None,
175 };
176 end.checked_sub(start).map(|x| x as _)
177}