reth_db/implementation/mdbx/
cursor.rs

1//! Cursor wrapper for libmdbx-sys.
2
3use super::utils::*;
4use crate::{
5    metrics::{DatabaseEnvMetrics, Operation},
6    DatabaseError,
7};
8use reth_db_api::{
9    common::{PairResult, ValueOnlyResult},
10    cursor::{
11        DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
12        ReverseWalker, Walker,
13    },
14    table::{Compress, Decode, Decompress, DupSort, Encode, Table},
15};
16use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
17use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
18use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
19
20/// Read only Cursor.
21pub type CursorRO<T> = Cursor<RO, T>;
22/// Read write cursor.
23pub type CursorRW<T> = Cursor<RW, T>;
24
25/// Cursor wrapper to access KV items.
26#[derive(Debug)]
27pub struct Cursor<K: TransactionKind, T: Table> {
28    /// Inner `libmdbx` cursor.
29    pub(crate) inner: reth_libmdbx::Cursor<K>,
30    /// Cache buffer that receives compressed values.
31    buf: Vec<u8>,
32    /// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
33    metrics: Option<Arc<DatabaseEnvMetrics>>,
34    /// Phantom data to enforce encoding/decoding.
35    _dbi: PhantomData<T>,
36}
37
38impl<K: TransactionKind, T: Table> Cursor<K, T> {
39    pub(crate) const fn new_with_metrics(
40        inner: reth_libmdbx::Cursor<K>,
41        metrics: Option<Arc<DatabaseEnvMetrics>>,
42    ) -> Self {
43        Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
44    }
45
46    /// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
47    /// size.
48    ///
49    /// Otherwise, just execute the closure.
50    fn execute_with_operation_metric<R>(
51        &mut self,
52        operation: Operation,
53        value_size: Option<usize>,
54        f: impl FnOnce(&mut Self) -> R,
55    ) -> R {
56        if let Some(metrics) = self.metrics.clone() {
57            metrics.record_operation(T::NAME, operation, value_size, || f(self))
58        } else {
59            f(self)
60        }
61    }
62}
63
64/// Decodes a `(key, value)` pair from the database.
65#[expect(clippy::type_complexity)]
66pub fn decode<T>(
67    res: Result<Option<(Cow<'_, [u8]>, Cow<'_, [u8]>)>, impl Into<DatabaseErrorInfo>>,
68) -> PairResult<T>
69where
70    T: Table,
71    T::Key: Decode,
72    T::Value: Decompress,
73{
74    res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
75}
76
77/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
78/// allocated buffer when we can just use their reference.
79macro_rules! compress_to_buf_or_ref {
80    ($self:expr, $value:expr) => {
81        if let Some(value) = $value.uncompressable_ref() {
82            Some(value)
83        } else {
84            $self.buf.clear();
85            $value.compress_to_buf(&mut $self.buf);
86            None
87        }
88    };
89}
90
91impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
92    fn first(&mut self) -> PairResult<T> {
93        decode::<T>(self.inner.first())
94    }
95
96    fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
97        decode::<T>(self.inner.set_key(key.encode().as_ref()))
98    }
99
100    fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
101        decode::<T>(self.inner.set_range(key.encode().as_ref()))
102    }
103
104    fn next(&mut self) -> PairResult<T> {
105        decode::<T>(self.inner.next())
106    }
107
108    fn prev(&mut self) -> PairResult<T> {
109        decode::<T>(self.inner.prev())
110    }
111
112    fn last(&mut self) -> PairResult<T> {
113        decode::<T>(self.inner.last())
114    }
115
116    fn current(&mut self) -> PairResult<T> {
117        decode::<T>(self.inner.get_current())
118    }
119
120    fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
121        let start = if let Some(start_key) = start_key {
122            decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
123        } else {
124            self.first().transpose()
125        };
126
127        Ok(Walker::new(self, start))
128    }
129
130    fn walk_range(
131        &mut self,
132        range: impl RangeBounds<T::Key>,
133    ) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
134        let start = match range.start_bound().cloned() {
135            Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
136            Bound::Excluded(_key) => {
137                unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
138            }
139            Bound::Unbounded => self.inner.first(),
140        };
141        let start = decode::<T>(start).transpose();
142        Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
143    }
144
145    fn walk_back(
146        &mut self,
147        start_key: Option<T::Key>,
148    ) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
149        let start = if let Some(start_key) = start_key {
150            decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
151        } else {
152            self.last()
153        }
154        .transpose();
155
156        Ok(ReverseWalker::new(self, start))
157    }
158}
159
160impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
161    /// Returns the next `(key, value)` pair of a DUPSORT table.
162    fn next_dup(&mut self) -> PairResult<T> {
163        decode::<T>(self.inner.next_dup())
164    }
165
166    /// Returns the next `(key, value)` pair skipping the duplicates.
167    fn next_no_dup(&mut self) -> PairResult<T> {
168        decode::<T>(self.inner.next_nodup())
169    }
170
171    /// Returns the next `value` of a duplicate `key`.
172    fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
173        self.inner
174            .next_dup()
175            .map_err(|e| DatabaseError::Read(e.into()))?
176            .map(decode_value::<T>)
177            .transpose()
178    }
179
180    fn seek_by_key_subkey(
181        &mut self,
182        key: <T as Table>::Key,
183        subkey: <T as DupSort>::SubKey,
184    ) -> ValueOnlyResult<T> {
185        self.inner
186            .get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
187            .map_err(|e| DatabaseError::Read(e.into()))?
188            .map(decode_one::<T>)
189            .transpose()
190    }
191
192    /// Depending on its arguments, returns an iterator starting at:
193    /// - Some(key), Some(subkey): a `key` item whose data is >= than `subkey`
194    /// - Some(key), None: first item of a specified `key`
195    /// - None, Some(subkey): like first case, but in the first key
196    /// - None, None: first item in the table of a DUPSORT table.
197    fn walk_dup(
198        &mut self,
199        key: Option<T::Key>,
200        subkey: Option<T::SubKey>,
201    ) -> Result<DupWalker<'_, T, Self>, DatabaseError> {
202        let start = match (key, subkey) {
203            (Some(key), Some(subkey)) => {
204                // encode key and decode it after.
205                let key: Vec<u8> = key.encode().into();
206                self.inner
207                    .get_both_range(key.as_ref(), subkey.encode().as_ref())
208                    .map_err(|e| DatabaseError::Read(e.into()))?
209                    .map(|val| decoder::<T>((Cow::Owned(key), val)))
210            }
211            (Some(key), None) => {
212                let key: Vec<u8> = key.encode().into();
213                self.inner
214                    .set(key.as_ref())
215                    .map_err(|e| DatabaseError::Read(e.into()))?
216                    .map(|val| decoder::<T>((Cow::Owned(key), val)))
217            }
218            (None, Some(subkey)) => {
219                if let Some((key, _)) = self.first()? {
220                    let key: Vec<u8> = key.encode().into();
221                    self.inner
222                        .get_both_range(key.as_ref(), subkey.encode().as_ref())
223                        .map_err(|e| DatabaseError::Read(e.into()))?
224                        .map(|val| decoder::<T>((Cow::Owned(key), val)))
225                } else {
226                    Some(Err(DatabaseError::Read(MDBXError::NotFound.into())))
227                }
228            }
229            (None, None) => self.first().transpose(),
230        };
231
232        Ok(DupWalker::<'_, T, Self> { cursor: self, start })
233    }
234}
235
236impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
237    /// Database operation that will update an existing row if a specified value already
238    /// exists in a table, and insert a new row if the specified value doesn't already exist
239    ///
240    /// For a DUPSORT table, `upsert` will not actually update-or-insert. If the key already exists,
241    /// it will append the value to the subkey, even if the subkeys are the same. So if you want
242    /// to properly upsert, you'll need to `seek_exact` & `delete_current` if the key+subkey was
243    /// found, before calling `upsert`.
244    fn upsert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
245        let key = key.encode();
246        let value = compress_to_buf_or_ref!(self, value);
247        self.execute_with_operation_metric(
248            Operation::CursorUpsert,
249            Some(value.unwrap_or(&self.buf).len()),
250            |this| {
251                this.inner
252                    .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
253                    .map_err(|e| {
254                        DatabaseWriteError {
255                            info: e.into(),
256                            operation: DatabaseWriteOperation::CursorUpsert,
257                            table_name: T::NAME,
258                            key: key.into(),
259                        }
260                        .into()
261                    })
262            },
263        )
264    }
265
266    fn insert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
267        let key = key.encode();
268        let value = compress_to_buf_or_ref!(self, value);
269        self.execute_with_operation_metric(
270            Operation::CursorInsert,
271            Some(value.unwrap_or(&self.buf).len()),
272            |this| {
273                this.inner
274                    .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
275                    .map_err(|e| {
276                        DatabaseWriteError {
277                            info: e.into(),
278                            operation: DatabaseWriteOperation::CursorInsert,
279                            table_name: T::NAME,
280                            key: key.into(),
281                        }
282                        .into()
283                    })
284            },
285        )
286    }
287
288    /// Appends the data to the end of the table. Consequently, the append operation
289    /// will fail if the inserted key is less than the last table key
290    fn append(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
291        let key = key.encode();
292        let value = compress_to_buf_or_ref!(self, value);
293        self.execute_with_operation_metric(
294            Operation::CursorAppend,
295            Some(value.unwrap_or(&self.buf).len()),
296            |this| {
297                this.inner
298                    .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
299                    .map_err(|e| {
300                        DatabaseWriteError {
301                            info: e.into(),
302                            operation: DatabaseWriteOperation::CursorAppend,
303                            table_name: T::NAME,
304                            key: key.into(),
305                        }
306                        .into()
307                    })
308            },
309        )
310    }
311
312    fn delete_current(&mut self) -> Result<(), DatabaseError> {
313        self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| {
314            this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
315        })
316    }
317}
318
319impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
320    fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
321        self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| {
322            this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into()))
323        })
324    }
325
326    fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
327        let key = key.encode();
328        let value = compress_to_buf_or_ref!(self, value);
329        self.execute_with_operation_metric(
330            Operation::CursorAppendDup,
331            Some(value.unwrap_or(&self.buf).len()),
332            |this| {
333                this.inner
334                    .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
335                    .map_err(|e| {
336                        DatabaseWriteError {
337                            info: e.into(),
338                            operation: DatabaseWriteOperation::CursorAppendDup,
339                            table_name: T::NAME,
340                            key: key.into(),
341                        }
342                        .into()
343                    })
344            },
345        )
346    }
347}