use crate::{
lockfile::StorageLock,
metrics::DatabaseEnvMetrics,
tables::{self, Tables},
utils::default_page_size,
DatabaseError, TableSet,
};
use eyre::Context;
use metrics::{gauge, Label};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
models::ClientVersion,
transaction::{DbTx, DbTxMut},
};
use reth_libmdbx::{
ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
};
use reth_storage_errors::db::LogLevel;
use reth_tracing::tracing::error;
use std::{
ops::{Deref, Range},
path::Path,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tx::Tx;
pub mod cursor;
pub mod tx;
pub const KILOBYTE: usize = 1024;
pub const MEGABYTE: usize = KILOBYTE * 1024;
pub const GIGABYTE: usize = MEGABYTE * 1024;
pub const TERABYTE: usize = GIGABYTE * 1024;
const DEFAULT_MAX_READERS: u64 = 32_000;
const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
#[derive(Debug)]
pub enum DatabaseEnvKind {
RO,
RW,
}
impl DatabaseEnvKind {
pub const fn is_rw(&self) -> bool {
matches!(self, Self::RW)
}
}
#[derive(Clone, Debug)]
pub struct DatabaseArguments {
client_version: ClientVersion,
geometry: Geometry<Range<usize>>,
log_level: Option<LogLevel>,
max_read_transaction_duration: Option<MaxReadTransactionDuration>,
exclusive: Option<bool>,
}
impl Default for DatabaseArguments {
fn default() -> Self {
Self::new(ClientVersion::default())
}
}
impl DatabaseArguments {
pub fn new(client_version: ClientVersion) -> Self {
Self {
client_version,
geometry: Geometry {
size: Some(0..(4 * TERABYTE)),
growth_step: Some(4 * GIGABYTE as isize),
shrink_threshold: Some(0),
page_size: Some(PageSize::Set(default_page_size())),
},
log_level: None,
max_read_transaction_duration: None,
exclusive: None,
}
}
pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
if let Some(max_size) = max_size {
self.geometry.size = Some(0..max_size);
}
self
}
pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
if let Some(growth_step) = growth_step {
self.geometry.growth_step = Some(growth_step as isize);
}
self
}
pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
self.log_level = log_level;
self
}
pub const fn with_max_read_transaction_duration(
mut self,
max_read_transaction_duration: Option<MaxReadTransactionDuration>,
) -> Self {
self.max_read_transaction_duration = max_read_transaction_duration;
self
}
pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
self.exclusive = exclusive;
self
}
pub const fn client_version(&self) -> &ClientVersion {
&self.client_version
}
}
#[derive(Debug)]
pub struct DatabaseEnv {
inner: Environment,
metrics: Option<Arc<DatabaseEnvMetrics>>,
_lock_file: Option<StorageLock>,
}
impl Database for DatabaseEnv {
type TX = tx::Tx<RO>;
type TXMut = tx::Tx<RW>;
fn tx(&self) -> Result<Self::TX, DatabaseError> {
Tx::new_with_metrics(
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.clone(),
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
Tx::new_with_metrics(
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.clone(),
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
}
impl DatabaseMetrics for DatabaseEnv {
fn report_metrics(&self) {
for (name, value, labels) in self.gauge_metrics() {
gauge!(name, labels).set(value);
}
}
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
let mut metrics = Vec::new();
let _ = self
.view(|tx| {
for table in Tables::ALL.iter().map(Tables::name) {
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {table}"))?;
let page_size = stats.page_size() as usize;
let leaf_pages = stats.leaf_pages();
let branch_pages = stats.branch_pages();
let overflow_pages = stats.overflow_pages();
let num_pages = leaf_pages + branch_pages + overflow_pages;
let table_size = page_size * num_pages;
let entries = stats.entries();
metrics.push((
"db.table_size",
table_size as f64,
vec![Label::new("table", table)],
));
metrics.push((
"db.table_pages",
leaf_pages as f64,
vec![Label::new("table", table), Label::new("type", "leaf")],
));
metrics.push((
"db.table_pages",
branch_pages as f64,
vec![Label::new("table", table), Label::new("type", "branch")],
));
metrics.push((
"db.table_pages",
overflow_pages as f64,
vec![Label::new("table", table), Label::new("type", "overflow")],
));
metrics.push((
"db.table_entries",
entries as f64,
vec![Label::new("table", table)],
));
}
Ok::<(), eyre::Report>(())
})
.map_err(|error| error!(%error, "Failed to read db table stats"));
if let Ok(freelist) =
self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
{
metrics.push(("db.freelist", freelist as f64, vec![]));
}
if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
}
metrics.push((
"db.timed_out_not_aborted_transactions",
self.timed_out_not_aborted_transactions() as f64,
vec![],
));
metrics
}
}
impl DatabaseMetadata for DatabaseEnv {
fn metadata(&self) -> DatabaseMetadataValue {
DatabaseMetadataValue::new(self.freelist().ok())
}
}
impl DatabaseEnv {
pub fn open(
path: &Path,
kind: DatabaseEnvKind,
args: DatabaseArguments,
) -> Result<Self, DatabaseError> {
let _lock_file = if kind.is_rw() {
StorageLock::try_acquire(path)
.map_err(|err| DatabaseError::Other(err.to_string()))?
.into()
} else {
None
};
let mut inner_env = Environment::builder();
let mode = match kind {
DatabaseEnvKind::RO => Mode::ReadOnly,
DatabaseEnvKind::RW => {
inner_env.write_map();
Mode::ReadWrite { sync_mode: SyncMode::Durable }
}
};
debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
inner_env.set_max_dbs(256);
inner_env.set_geometry(args.geometry);
fn is_current_process(id: u32) -> bool {
#[cfg(unix)]
{
id == std::os::unix::process::parent_id() || id == std::process::id()
}
#[cfg(not(unix))]
{
id == std::process::id()
}
}
extern "C" fn handle_slow_readers(
_env: *const ffi::MDBX_env,
_txn: *const ffi::MDBX_txn,
process_id: ffi::mdbx_pid_t,
thread_id: ffi::mdbx_tid_t,
read_txn_id: u64,
gap: std::ffi::c_uint,
space: usize,
retry: std::ffi::c_int,
) -> HandleSlowReadersReturnCode {
if space > MAX_SAFE_READER_SPACE {
let message = if is_current_process(process_id as u32) {
"Current process has a long-lived database transaction that grows the database file."
} else {
"External process has a long-lived database transaction that grows the database file. \
Use shorter-lived read transactions or shut down the node."
};
reth_tracing::tracing::warn!(
target: "storage::db::mdbx",
?process_id,
?thread_id,
?read_txn_id,
?gap,
?space,
?retry,
"{message}"
)
}
reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
}
inner_env.set_handle_slow_readers(handle_slow_readers);
inner_env.set_flags(EnvironmentFlags {
mode,
no_rdahead: true,
coalesce: true,
exclusive: args.exclusive.unwrap_or_default(),
..Default::default()
});
inner_env.set_max_readers(DEFAULT_MAX_READERS);
inner_env.set_rp_augment_limit(256 * 1024);
if let Some(log_level) = args.log_level {
let is_log_level_available = if cfg!(debug_assertions) {
true
} else {
matches!(
log_level,
LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
)
};
if is_log_level_available {
inner_env.set_log_level(match log_level {
LogLevel::Fatal => 0,
LogLevel::Error => 1,
LogLevel::Warn => 2,
LogLevel::Notice => 3,
LogLevel::Verbose => 4,
LogLevel::Debug => 5,
LogLevel::Trace => 6,
LogLevel::Extra => 7,
});
} else {
return Err(DatabaseError::LogLevelUnavailable(log_level))
}
}
if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
}
let env = Self {
inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
metrics: None,
_lock_file,
};
Ok(env)
}
pub fn with_metrics(mut self) -> Self {
self.metrics = Some(DatabaseEnvMetrics::new().into());
self
}
pub fn create_tables(&self) -> Result<(), DatabaseError> {
self.create_tables_for::<Tables>()
}
pub fn create_tables_for<TS: TableSet>(&self) -> Result<(), DatabaseError> {
let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
for table in TS::tables() {
let flags =
if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
tx.create_db(Some(table.name()), flags)
.map_err(|e| DatabaseError::CreateTable(e.into()))?;
}
tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
Ok(())
}
pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
if version.is_empty() {
return Ok(())
}
let tx = self.tx_mut()?;
let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
let last_version = version_cursor.last()?.map(|(_, v)| v);
if Some(&version) != last_version.as_ref() {
version_cursor.upsert(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
version,
)?;
tx.commit()?;
}
Ok(())
}
}
impl Deref for DatabaseEnv {
type Target = Environment;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tables::{
AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
},
test_utils::*,
AccountChangeSets,
};
use alloy_consensus::Header;
use alloy_primitives::{Address, B256, U256};
use reth_db_api::{
cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
models::{AccountBeforeTx, IntegerList, ShardedKey},
table::{Encode, Table},
};
use reth_libmdbx::Error;
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
use std::str::FromStr;
use tempfile::TempDir;
fn create_test_db(kind: DatabaseEnvKind) -> Arc<DatabaseEnv> {
Arc::new(create_test_db_with_path(
kind,
&tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(),
))
}
fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
let env = DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
.expect(ERROR_DB_CREATION);
env.create_tables().expect(ERROR_TABLE_CREATION);
env
}
const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
const ERROR_PUT: &str = "Not able to insert value into table.";
const ERROR_APPEND: &str = "Not able to append the value to the table.";
const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
const ERROR_GET: &str = "Not able to get value from table.";
const ERROR_DEL: &str = "Not able to delete from table.";
const ERROR_COMMIT: &str = "Not able to commit transaction.";
const ERROR_RETURN_VALUE: &str = "Mismatching result.";
const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
const ERROR_ETH_ADDRESS: &str = "Invalid address.";
#[test]
fn db_creation() {
create_test_db(DatabaseEnvKind::RW);
}
#[test]
fn db_manual_put_get() {
let env = create_test_db(DatabaseEnvKind::RW);
let value = Header::default();
let key = 1u64;
let tx = env.tx_mut().expect(ERROR_INIT_TX);
tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = env.tx().expect(ERROR_INIT_TX);
let result = tx.get::<Headers>(key).expect(ERROR_GET);
assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
tx.commit().expect(ERROR_COMMIT);
}
#[test]
fn db_dup_cursor_delete_first() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
dup_cursor.upsert(Address::with_last_byte(1), entry_0).expect(ERROR_UPSERT);
dup_cursor.upsert(Address::with_last_byte(1), entry_1).expect(ERROR_UPSERT);
assert_eq!(
dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>(),
Ok(vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),])
);
let mut walker = dup_cursor.walk(None).unwrap();
walker.delete_current().expect(ERROR_DEL);
assert_eq!(walker.next(), Some(Ok((Address::with_last_byte(1), entry_1))));
assert_eq!(
tx.cursor_dup_read::<PlainStorageState>()
.unwrap()
.walk(None)
.unwrap()
.collect::<Result<Vec<_>, _>>(),
Ok(vec![
(Address::with_last_byte(1), entry_1), ])
);
assert_eq!(walker.next(), None);
}
#[test]
fn db_cursor_walk() {
let env = create_test_db(DatabaseEnvKind::RW);
let value = Header::default();
let key = 1u64;
let tx = env.tx_mut().expect(ERROR_INIT_TX);
tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<Headers>().unwrap();
let first = cursor.first().unwrap();
assert!(first.is_some(), "First should be our put");
let walk = cursor.walk(Some(key)).unwrap();
let first = walk.into_iter().next().unwrap().unwrap();
assert_eq!(first.1, value, "First next should be put value");
}
#[test]
fn db_cursor_walk_range() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 2, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let mut walker = cursor.walk_range(1..3).unwrap();
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
assert_eq!(walker.next(), None);
assert_eq!(walker.next(), None);
let mut walker = cursor.walk_range(1..=2).unwrap();
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
assert_eq!(walker.next(), None);
let mut walker = cursor.walk_range(1..).unwrap();
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(walker.next(), None);
let mut walker = cursor.walk_range(2..4).unwrap();
assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(walker.next(), None);
assert_eq!(walker.next(), None);
let mut walker = cursor.walk_range(..3).unwrap();
assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
assert_eq!(walker.next(), None);
let mut walker = cursor.walk_range(..).unwrap();
assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(walker.next(), None);
}
#[test]
fn db_cursor_walk_range_on_dup_table() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let address0 = Address::ZERO;
let address1 = Address::with_last_byte(1);
let address2 = Address::with_last_byte(2);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
.expect(ERROR_PUT);
tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
.expect(ERROR_PUT);
tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
.expect(ERROR_PUT);
tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
.expect(ERROR_PUT);
tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
.expect(ERROR_PUT);
tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
.expect(ERROR_PUT);
tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) .expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 7);
let mut walker = cursor.walk_range(0..=1).unwrap();
assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address0, info: None }))));
assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address1, info: None }))));
assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address2, info: None }))));
assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address0, info: None }))));
assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address1, info: None }))));
assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address2, info: None }))));
assert_eq!(walker.next(), None);
}
#[allow(clippy::reversed_empty_ranges)]
#[test]
fn db_cursor_walk_range_invalid() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 2, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let mut res = cursor.walk_range(3..1).unwrap();
assert_eq!(res.next(), None);
let mut res = cursor.walk_range(15..=2).unwrap();
assert_eq!(res.next(), None);
let mut walker = cursor.walk_range(1..1).unwrap();
assert_eq!(walker.next(), None);
}
#[test]
fn db_walker() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let mut walker = Walker::new(&mut cursor, None);
assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(walker.next(), None);
let mut reverse_walker = walker.rev();
assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(reverse_walker.next(), None);
}
#[test]
fn db_reverse_walker() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(reverse_walker.next(), None);
let mut walker = reverse_walker.forward();
assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(walker.next(), None);
}
#[test]
fn db_walk_back() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(reverse_walker.next(), None);
let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(reverse_walker.next(), None);
let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(reverse_walker.next(), None);
let mut reverse_walker = cursor.walk_back(None).unwrap();
assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
assert_eq!(reverse_walker.next(), None);
}
#[test]
fn db_cursor_seek_exact_or_previous_key() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let missing_key = 2;
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
assert_eq!(cursor.current(), Ok(None));
let exact = cursor.seek_exact(missing_key).unwrap();
assert_eq!(exact, None);
assert_eq!(cursor.current(), Ok(Some((missing_key + 1, B256::ZERO))));
assert_eq!(cursor.prev(), Ok(Some((missing_key - 1, B256::ZERO))));
assert_eq!(cursor.prev(), Ok(Some((missing_key - 2, B256::ZERO))));
}
#[test]
fn db_cursor_insert() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3, 4, 5]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let key_to_insert = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert_eq!(cursor.insert(key_to_insert, B256::ZERO), Ok(()));
assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
assert_eq!(
cursor.insert(key_to_insert, B256::ZERO),
Err(DatabaseWriteError {
info: Error::KeyExist.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: CanonicalHeaders::NAME,
key: key_to_insert.encode().into(),
}
.into())
);
assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
tx.commit().expect(ERROR_COMMIT);
}
#[test]
fn db_cursor_insert_dup() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
let key = Address::random();
let subkey1 = B256::random();
let subkey2 = B256::random();
let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
assert!(dup_cursor.insert(key, entry1).is_ok());
let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
assert!(dup_cursor.insert(key, entry2).is_err());
}
#[test]
fn db_cursor_delete_current_non_existent() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let key1 = Address::with_last_byte(1);
let key2 = Address::with_last_byte(2);
let key3 = Address::with_last_byte(3);
let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
assert!(cursor.insert(key1, Account::default()).is_ok());
assert!(cursor.insert(key2, Account::default()).is_ok());
assert!(cursor.insert(key3, Account::default()).is_ok());
cursor.seek_exact(key2).unwrap();
assert_eq!(cursor.delete_current(), Ok(()));
assert_eq!(cursor.seek_exact(key2), Ok(None));
assert_eq!(cursor.seek_exact(key2), Ok(None));
assert_eq!(cursor.delete_current(), Ok(()));
assert_eq!(cursor.seek_exact(key1), Ok(Some((key1, Account::default()))));
assert_eq!(cursor.seek_exact(key3), Ok(None));
}
#[test]
fn db_cursor_insert_wherever_cursor_is() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3, 5, 7, 9]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.last().unwrap();
assert_eq!(cursor.current(), Ok(Some((9, B256::ZERO))));
for pos in (2..=8).step_by(2) {
assert_eq!(cursor.insert(pos, B256::ZERO), Ok(()));
assert_eq!(cursor.current(), Ok(Some((pos, B256::ZERO))));
}
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
tx.commit().expect(ERROR_COMMIT);
}
#[test]
fn db_cursor_append() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 2, 3, 4]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let key_to_append = 5;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert_eq!(cursor.append(key_to_append, B256::ZERO), Ok(()));
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
tx.commit().expect(ERROR_COMMIT);
}
#[test]
fn db_cursor_append_failure() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
vec![0, 1, 3, 4, 5]
.into_iter()
.try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
.expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
let key_to_append = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert_eq!(
cursor.append(key_to_append, B256::ZERO),
Err(DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: CanonicalHeaders::NAME,
key: key_to_append.encode().into(),
}
.into())
);
assert_eq!(cursor.current(), Ok(Some((5, B256::ZERO)))); tx.commit().expect(ERROR_COMMIT);
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 3, 4, 5]);
tx.commit().expect(ERROR_COMMIT);
}
#[test]
fn db_cursor_upsert() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
let key = Address::random();
let account = Account::default();
cursor.upsert(key, account).expect(ERROR_UPSERT);
assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
let account = Account { nonce: 1, ..Default::default() };
cursor.upsert(key, account).expect(ERROR_UPSERT);
assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
let account = Account { nonce: 2, ..Default::default() };
cursor.upsert(key, account).expect(ERROR_UPSERT);
assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
let subkey = B256::random();
let value = U256::from(1);
let entry1 = StorageEntry { key: subkey, value };
dup_cursor.upsert(key, entry1).expect(ERROR_UPSERT);
assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
let value = U256::from(2);
let entry2 = StorageEntry { key: subkey, value };
dup_cursor.upsert(key, entry2).expect(ERROR_UPSERT);
assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
assert_eq!(dup_cursor.next_dup_val(), Ok(Some(entry2)));
}
#[test]
fn db_cursor_dupsort_append() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let transition_id = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
vec![0, 1, 3, 4, 5]
.into_iter()
.try_for_each(|val| {
cursor.append(
transition_id,
AccountBeforeTx { address: Address::with_last_byte(val), info: None },
)
})
.expect(ERROR_APPEND);
tx.commit().expect(ERROR_COMMIT);
let subkey_to_append = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
assert_eq!(
cursor.append_dup(
transition_id,
AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
),
Err(DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: AccountChangeSets::NAME,
key: transition_id.encode().into(),
}
.into())
);
assert_eq!(
cursor.append(
transition_id - 1,
AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
),
Err(DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: AccountChangeSets::NAME,
key: (transition_id - 1).encode().into(),
}
.into())
);
assert_eq!(
cursor.append(
transition_id,
AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
),
Ok(())
);
}
#[test]
fn db_closure_put_get() {
let path = TempDir::new().expect(ERROR_TEMPDIR).into_path();
let value = Account {
nonce: 18446744073709551615,
bytecode_hash: Some(B256::random()),
balance: U256::MAX,
};
let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
.expect(ERROR_ETH_ADDRESS);
{
let env = create_test_db_with_path(DatabaseEnvKind::RW, &path);
let result = env.update(|tx| {
tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
200
});
assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
}
let env = DatabaseEnv::open(
&path,
DatabaseEnvKind::RO,
DatabaseArguments::new(ClientVersion::default()),
)
.expect(ERROR_DB_CREATION);
let result =
env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
assert_eq!(result, Some(value))
}
#[test]
fn db_dup_sort() {
let env = create_test_db(DatabaseEnvKind::RW);
let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
.expect(ERROR_ETH_ADDRESS);
let value00 = StorageEntry::default();
env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
}
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
assert_eq!(
(key, value11),
walker
.next()
.expect("element should exist.")
.expect("should be able to retrieve it.")
);
}
}
#[test]
fn db_iterate_over_all_dup_values() {
let env = create_test_db(DatabaseEnvKind::RW);
let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
.expect(ERROR_ETH_ADDRESS);
let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
.expect(ERROR_ETH_ADDRESS);
let value00 = StorageEntry::default();
env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let mut walker = cursor.walk_dup(None, None).unwrap();
assert_eq!(Some(Ok((key1, value00))), walker.next());
assert_eq!(Some(Ok((key1, value11))), walker.next());
assert_eq!(None, walker.next());
}
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let first = cursor.first().unwrap().unwrap();
let mut walker = cursor.walk(Some(first.0)).unwrap();
assert_eq!(Some(Ok((key1, value00))), walker.next());
assert_eq!(Some(Ok((key1, value11))), walker.next());
assert_eq!(Some(Ok((key2, value22))), walker.next());
}
}
#[test]
fn dup_value_with_same_subkey() {
let env = create_test_db(DatabaseEnvKind::RW);
let key1 = Address::new([0x11; 20]);
let key2 = Address::new([0x22; 20]);
let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
let value00 = StorageEntry::default();
env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
let first = cursor.first().unwrap().unwrap();
let mut walker = cursor.walk(Some(first.0)).unwrap();
assert_eq!(Some(Ok((key1, value00))), walker.next());
assert_eq!(Some(Ok((key1, value01))), walker.next());
assert_eq!(Some(Ok((key2, value22))), walker.next());
}
{
let tx = env.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
assert_eq!(Ok(Some(value00)), cursor.seek_by_key_subkey(key1, value00.key));
assert_eq!(Ok(None), cursor.seek_by_key_subkey(key1, value22.key));
}
}
#[test]
fn db_sharded_key() {
let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
let real_key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047").unwrap();
for i in 1..5 {
let key = ShardedKey::new(real_key, i * 100);
let list = IntegerList::new_pre_sorted([i * 100u64]);
db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
.unwrap();
}
{
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
let (key, list) = walker
.next()
.expect("element should exist.")
.expect("should be able to retrieve it.");
assert_eq!(ShardedKey::new(real_key, 200), key);
let list200 = IntegerList::new_pre_sorted([200u64]);
assert_eq!(list200, list);
}
{
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
let (key, list) = cursor
.prev()
.expect("element should exist.")
.expect("should be able to retrieve it.");
assert_eq!(ShardedKey::new(real_key, 400), key);
let list400 = IntegerList::new_pre_sorted([400u64]);
assert_eq!(list400, list);
}
}
}