reth_db/implementation/mdbx/
cursor.rs
1use super::utils::*;
4use crate::{
5 metrics::{DatabaseEnvMetrics, Operation},
6 DatabaseError,
7};
8use reth_db_api::{
9 common::{PairResult, ValueOnlyResult},
10 cursor::{
11 DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
12 ReverseWalker, Walker,
13 },
14 table::{Compress, Decode, Decompress, DupSort, Encode, Table},
15};
16use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
17use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
18use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
19
20pub type CursorRO<T> = Cursor<RO, T>;
22pub type CursorRW<T> = Cursor<RW, T>;
24
25#[derive(Debug)]
27pub struct Cursor<K: TransactionKind, T: Table> {
28 pub(crate) inner: reth_libmdbx::Cursor<K>,
30 buf: Vec<u8>,
32 metrics: Option<Arc<DatabaseEnvMetrics>>,
34 _dbi: PhantomData<T>,
36}
37
38impl<K: TransactionKind, T: Table> Cursor<K, T> {
39 pub(crate) const fn new_with_metrics(
40 inner: reth_libmdbx::Cursor<K>,
41 metrics: Option<Arc<DatabaseEnvMetrics>>,
42 ) -> Self {
43 Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
44 }
45
46 fn execute_with_operation_metric<R>(
51 &mut self,
52 operation: Operation,
53 value_size: Option<usize>,
54 f: impl FnOnce(&mut Self) -> R,
55 ) -> R {
56 if let Some(metrics) = self.metrics.clone() {
57 metrics.record_operation(T::NAME, operation, value_size, || f(self))
58 } else {
59 f(self)
60 }
61 }
62}
63
64#[expect(clippy::type_complexity)]
66pub fn decode<T>(
67 res: Result<Option<(Cow<'_, [u8]>, Cow<'_, [u8]>)>, impl Into<DatabaseErrorInfo>>,
68) -> PairResult<T>
69where
70 T: Table,
71 T::Key: Decode,
72 T::Value: Decompress,
73{
74 res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
75}
76
77macro_rules! compress_to_buf_or_ref {
80 ($self:expr, $value:expr) => {
81 if let Some(value) = $value.uncompressable_ref() {
82 Some(value)
83 } else {
84 $self.buf.clear();
85 $value.compress_to_buf(&mut $self.buf);
86 None
87 }
88 };
89}
90
91impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
92 fn first(&mut self) -> PairResult<T> {
93 decode::<T>(self.inner.first())
94 }
95
96 fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
97 decode::<T>(self.inner.set_key(key.encode().as_ref()))
98 }
99
100 fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
101 decode::<T>(self.inner.set_range(key.encode().as_ref()))
102 }
103
104 fn next(&mut self) -> PairResult<T> {
105 decode::<T>(self.inner.next())
106 }
107
108 fn prev(&mut self) -> PairResult<T> {
109 decode::<T>(self.inner.prev())
110 }
111
112 fn last(&mut self) -> PairResult<T> {
113 decode::<T>(self.inner.last())
114 }
115
116 fn current(&mut self) -> PairResult<T> {
117 decode::<T>(self.inner.get_current())
118 }
119
120 fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
121 let start = if let Some(start_key) = start_key {
122 decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
123 } else {
124 self.first().transpose()
125 };
126
127 Ok(Walker::new(self, start))
128 }
129
130 fn walk_range(
131 &mut self,
132 range: impl RangeBounds<T::Key>,
133 ) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
134 let start = match range.start_bound().cloned() {
135 Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
136 Bound::Excluded(_key) => {
137 unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
138 }
139 Bound::Unbounded => self.inner.first(),
140 };
141 let start = decode::<T>(start).transpose();
142 Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
143 }
144
145 fn walk_back(
146 &mut self,
147 start_key: Option<T::Key>,
148 ) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
149 let start = if let Some(start_key) = start_key {
150 decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
151 } else {
152 self.last()
153 }
154 .transpose();
155
156 Ok(ReverseWalker::new(self, start))
157 }
158}
159
160impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
161 fn next_dup(&mut self) -> PairResult<T> {
163 decode::<T>(self.inner.next_dup())
164 }
165
166 fn next_no_dup(&mut self) -> PairResult<T> {
168 decode::<T>(self.inner.next_nodup())
169 }
170
171 fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
173 self.inner
174 .next_dup()
175 .map_err(|e| DatabaseError::Read(e.into()))?
176 .map(decode_value::<T>)
177 .transpose()
178 }
179
180 fn seek_by_key_subkey(
181 &mut self,
182 key: <T as Table>::Key,
183 subkey: <T as DupSort>::SubKey,
184 ) -> ValueOnlyResult<T> {
185 self.inner
186 .get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
187 .map_err(|e| DatabaseError::Read(e.into()))?
188 .map(decode_one::<T>)
189 .transpose()
190 }
191
192 fn walk_dup(
198 &mut self,
199 key: Option<T::Key>,
200 subkey: Option<T::SubKey>,
201 ) -> Result<DupWalker<'_, T, Self>, DatabaseError> {
202 let start = match (key, subkey) {
203 (Some(key), Some(subkey)) => {
204 let key: Vec<u8> = key.encode().into();
206 self.inner
207 .get_both_range(key.as_ref(), subkey.encode().as_ref())
208 .map_err(|e| DatabaseError::Read(e.into()))?
209 .map(|val| decoder::<T>((Cow::Owned(key), val)))
210 }
211 (Some(key), None) => {
212 let key: Vec<u8> = key.encode().into();
213 self.inner
214 .set(key.as_ref())
215 .map_err(|e| DatabaseError::Read(e.into()))?
216 .map(|val| decoder::<T>((Cow::Owned(key), val)))
217 }
218 (None, Some(subkey)) => {
219 if let Some((key, _)) = self.first()? {
220 let key: Vec<u8> = key.encode().into();
221 self.inner
222 .get_both_range(key.as_ref(), subkey.encode().as_ref())
223 .map_err(|e| DatabaseError::Read(e.into()))?
224 .map(|val| decoder::<T>((Cow::Owned(key), val)))
225 } else {
226 Some(Err(DatabaseError::Read(MDBXError::NotFound.into())))
227 }
228 }
229 (None, None) => self.first().transpose(),
230 };
231
232 Ok(DupWalker::<'_, T, Self> { cursor: self, start })
233 }
234}
235
236impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
237 fn upsert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
245 let key = key.encode();
246 let value = compress_to_buf_or_ref!(self, value);
247 self.execute_with_operation_metric(
248 Operation::CursorUpsert,
249 Some(value.unwrap_or(&self.buf).len()),
250 |this| {
251 this.inner
252 .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
253 .map_err(|e| {
254 DatabaseWriteError {
255 info: e.into(),
256 operation: DatabaseWriteOperation::CursorUpsert,
257 table_name: T::NAME,
258 key: key.into(),
259 }
260 .into()
261 })
262 },
263 )
264 }
265
266 fn insert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
267 let key = key.encode();
268 let value = compress_to_buf_or_ref!(self, value);
269 self.execute_with_operation_metric(
270 Operation::CursorInsert,
271 Some(value.unwrap_or(&self.buf).len()),
272 |this| {
273 this.inner
274 .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
275 .map_err(|e| {
276 DatabaseWriteError {
277 info: e.into(),
278 operation: DatabaseWriteOperation::CursorInsert,
279 table_name: T::NAME,
280 key: key.into(),
281 }
282 .into()
283 })
284 },
285 )
286 }
287
288 fn append(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
291 let key = key.encode();
292 let value = compress_to_buf_or_ref!(self, value);
293 self.execute_with_operation_metric(
294 Operation::CursorAppend,
295 Some(value.unwrap_or(&self.buf).len()),
296 |this| {
297 this.inner
298 .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
299 .map_err(|e| {
300 DatabaseWriteError {
301 info: e.into(),
302 operation: DatabaseWriteOperation::CursorAppend,
303 table_name: T::NAME,
304 key: key.into(),
305 }
306 .into()
307 })
308 },
309 )
310 }
311
312 fn delete_current(&mut self) -> Result<(), DatabaseError> {
313 self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| {
314 this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
315 })
316 }
317}
318
319impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
320 fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
321 self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| {
322 this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into()))
323 })
324 }
325
326 fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
327 let key = key.encode();
328 let value = compress_to_buf_or_ref!(self, value);
329 self.execute_with_operation_metric(
330 Operation::CursorAppendDup,
331 Some(value.unwrap_or(&self.buf).len()),
332 |this| {
333 this.inner
334 .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
335 .map_err(|e| {
336 DatabaseWriteError {
337 info: e.into(),
338 operation: DatabaseWriteOperation::CursorAppendDup,
339 table_name: T::NAME,
340 key: key.into(),
341 }
342 .into()
343 })
344 },
345 )
346 }
347}