reth_db/implementation/mdbx/
cursor.rsuse crate::{
metrics::{DatabaseEnvMetrics, Operation},
tables::utils::*,
DatabaseError,
};
use reth_db_api::{
common::{PairResult, ValueOnlyResult},
cursor::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
ReverseWalker, Walker,
},
table::{Compress, Decode, Decompress, DupSort, Encode, Table},
};
use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
pub type CursorRO<T> = Cursor<RO, T>;
pub type CursorRW<T> = Cursor<RW, T>;
#[derive(Debug)]
pub struct Cursor<K: TransactionKind, T: Table> {
pub(crate) inner: reth_libmdbx::Cursor<K>,
buf: Vec<u8>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
_dbi: PhantomData<T>,
}
impl<K: TransactionKind, T: Table> Cursor<K, T> {
pub(crate) const fn new_with_metrics(
inner: reth_libmdbx::Cursor<K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> Self {
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
}
fn execute_with_operation_metric<R>(
&mut self,
operation: Operation,
value_size: Option<usize>,
f: impl FnOnce(&mut Self) -> R,
) -> R {
if let Some(metrics) = self.metrics.clone() {
metrics.record_operation(T::NAME, operation, value_size, || f(self))
} else {
f(self)
}
}
}
#[allow(clippy::type_complexity)]
pub fn decode<T>(
res: Result<Option<(Cow<'_, [u8]>, Cow<'_, [u8]>)>, impl Into<DatabaseErrorInfo>>,
) -> PairResult<T>
where
T: Table,
T::Key: Decode,
T::Value: Decompress,
{
res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
}
macro_rules! compress_to_buf_or_ref {
($self:expr, $value:expr) => {
if let Some(value) = $value.uncompressable_ref() {
Some(value)
} else {
$self.buf.clear();
$value.compress_to_buf(&mut $self.buf);
None
}
};
}
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
fn first(&mut self) -> PairResult<T> {
decode::<T>(self.inner.first())
}
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.inner.set_key(key.encode().as_ref()))
}
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.inner.set_range(key.encode().as_ref()))
}
fn next(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next())
}
fn prev(&mut self) -> PairResult<T> {
decode::<T>(self.inner.prev())
}
fn last(&mut self) -> PairResult<T> {
decode::<T>(self.inner.last())
}
fn current(&mut self) -> PairResult<T> {
decode::<T>(self.inner.get_current())
}
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
} else {
self.first().transpose()
};
Ok(Walker::new(self, start))
}
fn walk_range(
&mut self,
range: impl RangeBounds<T::Key>,
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
let start = match range.start_bound().cloned() {
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
Bound::Excluded(_key) => {
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
}
Bound::Unbounded => self.inner.first(),
};
let start = decode::<T>(start).transpose();
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
}
fn walk_back(
&mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
} else {
self.last()
}
.transpose();
Ok(ReverseWalker::new(self, start))
}
}
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
fn next_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_dup())
}
fn next_no_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_nodup())
}
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner
.next_dup()
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_value::<T>)
.transpose()
}
fn seek_by_key_subkey(
&mut self,
key: <T as Table>::Key,
subkey: <T as DupSort>::SubKey,
) -> ValueOnlyResult<T> {
self.inner
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
}
fn walk_dup(
&mut self,
key: Option<T::Key>,
subkey: Option<T::SubKey>,
) -> Result<DupWalker<'_, T, Self>, DatabaseError> {
let start = match (key, subkey) {
(Some(key), Some(subkey)) => {
let key: Vec<u8> = key.encode().into();
self.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
}
(Some(key), None) => {
let key: Vec<u8> = key.encode().into();
self.inner
.set(key.as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
}
(None, Some(subkey)) => {
if let Some((key, _)) = self.first()? {
let key: Vec<u8> = key.encode().into();
self.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
} else {
Some(Err(DatabaseError::Read(MDBXError::NotFound.into())))
}
}
(None, None) => self.first().transpose(),
};
Ok(DupWalker::<'_, T, Self> { cursor: self, start })
}
}
impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
self.execute_with_operation_metric(
Operation::CursorUpsert,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorUpsert,
table_name: T::NAME,
key: key.into(),
}
.into()
})
},
)
}
fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
self.execute_with_operation_metric(
Operation::CursorInsert,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: T::NAME,
key: key.into(),
}
.into()
})
},
)
}
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
self.execute_with_operation_metric(
Operation::CursorAppend,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: T::NAME,
key: key.into(),
}
.into()
})
},
)
}
fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| {
this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
})
}
}
impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| {
this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into()))
})
}
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
self.execute_with_operation_metric(
Operation::CursorAppendDup,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: T::NAME,
key: key.into(),
}
.into()
})
},
)
}
}