Skip to main content

reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::{Path, PathBuf},
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106    /// Defines the synchronization strategy used by the MDBX database when writing data to disk.
107    ///
108    /// This determines how aggressively MDBX ensures data durability versus prioritizing
109    /// performance. The available modes are:
110    ///
111    /// - [`SyncMode::Durable`]: Ensures all transactions are fully flushed to disk before they are
112    ///   considered committed.   This provides the highest level of durability and crash safety
113    ///   but may have a performance cost.
114    /// - [`SyncMode::SafeNoSync`]: Skips certain fsync operations to improve write performance.
115    ///   This mode still maintains database integrity but may lose the most recent transactions if
116    ///   the system crashes unexpectedly.
117    ///
118    /// Choose `Durable` if consistency and crash safety are critical (e.g., production
119    /// environments). Choose `SafeNoSync` if performance is more important and occasional data
120    /// loss is acceptable (e.g., testing or ephemeral data).
121    sync_mode: SyncMode,
122}
123
124impl Default for DatabaseArguments {
125    fn default() -> Self {
126        Self::new(ClientVersion::default())
127    }
128}
129
130impl DatabaseArguments {
131    /// Create new database arguments with given client version.
132    pub fn new(client_version: ClientVersion) -> Self {
133        Self {
134            client_version,
135            geometry: Geometry {
136                size: Some(0..(8 * TERABYTE)),
137                growth_step: Some(4 * GIGABYTE as isize),
138                shrink_threshold: Some(0),
139                page_size: Some(PageSize::Set(default_page_size())),
140            },
141            log_level: None,
142            max_read_transaction_duration: None,
143            exclusive: None,
144            max_readers: None,
145            sync_mode: SyncMode::Durable,
146        }
147    }
148
149    /// Create database arguments suitable for testing.
150    ///
151    /// Uses a small geometry (64MB max, 4MB growth) to avoid exhausting the system's
152    /// virtual memory map limit (`vm.max_map_count`) when many test databases are open
153    /// concurrently.
154    pub fn test() -> Self {
155        Self {
156            geometry: Geometry {
157                size: Some(0..(64 * MEGABYTE)),
158                growth_step: Some(4 * MEGABYTE as isize),
159                shrink_threshold: Some(0),
160                page_size: Some(PageSize::Set(default_page_size())),
161            },
162            max_read_transaction_duration: Some(MaxReadTransactionDuration::Unbounded),
163            ..Self::new(ClientVersion::default())
164        }
165    }
166
167    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
168    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
169        if let Some(max_size) = max_size {
170            self.geometry.size = Some(0..max_size);
171        }
172        self
173    }
174
175    /// Sets the database page size value.
176    pub const fn with_geometry_page_size(mut self, page_size: Option<usize>) -> Self {
177        if let Some(size) = page_size {
178            self.geometry.page_size = Some(reth_libmdbx::PageSize::Set(size));
179        }
180
181        self
182    }
183
184    /// Sets the database sync mode.
185    pub const fn with_sync_mode(mut self, sync_mode: Option<SyncMode>) -> Self {
186        if let Some(sync_mode) = sync_mode {
187            self.sync_mode = sync_mode;
188        }
189
190        self
191    }
192
193    /// Configures the database growth step in bytes.
194    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
195        if let Some(growth_step) = growth_step {
196            self.geometry.growth_step = Some(growth_step as isize);
197        }
198        self
199    }
200
201    /// Set the log level.
202    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
203        self.log_level = log_level;
204        self
205    }
206
207    /// Set the maximum duration of a read transaction.
208    pub const fn max_read_transaction_duration(
209        &mut self,
210        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
211    ) {
212        self.max_read_transaction_duration = max_read_transaction_duration;
213    }
214
215    /// Set the maximum duration of a read transaction.
216    pub const fn with_max_read_transaction_duration(
217        mut self,
218        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
219    ) -> Self {
220        self.max_read_transaction_duration(max_read_transaction_duration);
221        self
222    }
223
224    /// Set the mdbx exclusive flag.
225    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
226        self.exclusive = exclusive;
227        self
228    }
229
230    /// Set `max_readers` flag.
231    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
232        self.max_readers = max_readers;
233        self
234    }
235
236    /// Returns the client version if any.
237    pub const fn client_version(&self) -> &ClientVersion {
238        &self.client_version
239    }
240}
241
242/// Wrapper for the libmdbx environment: [Environment]
243#[derive(Debug, Clone)]
244pub struct DatabaseEnv {
245    /// Libmdbx-sys environment.
246    inner: Environment,
247    /// Path to the database directory.
248    path: PathBuf,
249    /// Opened DBIs for reuse.
250    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
251    /// More generally, do not dynamically create, re-open, or drop tables at
252    /// runtime. It's better to perform table creation and migration only once
253    /// at startup.
254    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
255    /// Cache for metric handles. If `None`, metrics are not recorded.
256    metrics: Option<Arc<DatabaseEnvMetrics>>,
257    /// Write lock for when dealing with a read-write environment.
258    _lock_file: Option<StorageLock>,
259}
260
261impl Database for DatabaseEnv {
262    type TX = tx::Tx<RO>;
263    type TXMut = tx::Tx<RW>;
264
265    fn tx(&self) -> Result<Self::TX, DatabaseError> {
266        Tx::new(
267            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
268            self.dbis.clone(),
269            self.metrics.clone(),
270        )
271        .map_err(|e| DatabaseError::InitTx(e.into()))
272    }
273
274    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
275        Tx::new(
276            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
277            self.dbis.clone(),
278            self.metrics.clone(),
279        )
280        .map_err(|e| DatabaseError::InitTx(e.into()))
281    }
282
283    fn path(&self) -> PathBuf {
284        self.path.clone()
285    }
286}
287
288impl DatabaseMetrics for DatabaseEnv {
289    fn report_metrics(&self) {
290        for (name, value, labels) in self.gauge_metrics() {
291            gauge!(name, labels).set(value);
292        }
293    }
294
295    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
296        let mut metrics = Vec::new();
297
298        let _ = self
299            .view(|tx| {
300                for table in Tables::ALL.iter().map(Tables::name) {
301                    let table_db =
302                        tx.inner().open_db(Some(table)).wrap_err("Could not open db.")?;
303
304                    let stats = tx
305                        .inner()
306                        .db_stat(table_db.dbi())
307                        .wrap_err(format!("Could not find table: {table}"))?;
308
309                    let page_size = stats.page_size() as usize;
310                    let leaf_pages = stats.leaf_pages();
311                    let branch_pages = stats.branch_pages();
312                    let overflow_pages = stats.overflow_pages();
313                    let num_pages = leaf_pages + branch_pages + overflow_pages;
314                    let table_size = page_size * num_pages;
315                    let entries = stats.entries();
316
317                    metrics.push((
318                        "db.table_size",
319                        table_size as f64,
320                        vec![Label::new("table", table)],
321                    ));
322                    metrics.push((
323                        "db.table_pages",
324                        leaf_pages as f64,
325                        vec![Label::new("table", table), Label::new("type", "leaf")],
326                    ));
327                    metrics.push((
328                        "db.table_pages",
329                        branch_pages as f64,
330                        vec![Label::new("table", table), Label::new("type", "branch")],
331                    ));
332                    metrics.push((
333                        "db.table_pages",
334                        overflow_pages as f64,
335                        vec![Label::new("table", table), Label::new("type", "overflow")],
336                    ));
337                    metrics.push((
338                        "db.table_entries",
339                        entries as f64,
340                        vec![Label::new("table", table)],
341                    ));
342                }
343
344                Ok::<(), eyre::Report>(())
345            })
346            .map_err(|error| error!(%error, "Failed to read db table stats"));
347
348        if let Ok(freelist) =
349            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
350        {
351            metrics.push(("db.freelist", freelist as f64, vec![]));
352        }
353
354        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
355            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
356        }
357
358        metrics.push((
359            "db.timed_out_not_aborted_transactions",
360            self.timed_out_not_aborted_transactions() as f64,
361            vec![],
362        ));
363
364        metrics
365    }
366}
367
368impl DatabaseEnv {
369    /// Opens the database at the specified path with the given `EnvKind`.
370    ///
371    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
372    pub fn open(
373        path: &Path,
374        kind: DatabaseEnvKind,
375        args: DatabaseArguments,
376    ) -> Result<Self, DatabaseError> {
377        let _lock_file = if kind.is_rw() {
378            StorageLock::try_acquire(path)
379                .map_err(|err| DatabaseError::Other(err.to_string()))?
380                .into()
381        } else {
382            None
383        };
384
385        let mut inner_env = Environment::builder();
386
387        let mode = match kind {
388            DatabaseEnvKind::RO => Mode::ReadOnly,
389            DatabaseEnvKind::RW => {
390                // enable writemap mode in RW mode
391                inner_env.write_map();
392                Mode::ReadWrite { sync_mode: args.sync_mode }
393            }
394        };
395
396        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
397        // environment creation.
398        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
399        inner_env.set_max_dbs(256);
400        inner_env.set_geometry(args.geometry);
401
402        fn is_current_process(id: u32) -> bool {
403            #[cfg(unix)]
404            {
405                id == std::os::unix::process::parent_id() || id == std::process::id()
406            }
407
408            #[cfg(not(unix))]
409            {
410                id == std::process::id()
411            }
412        }
413
414        extern "C" fn handle_slow_readers(
415            _env: *const ffi::MDBX_env,
416            _txn: *const ffi::MDBX_txn,
417            process_id: ffi::mdbx_pid_t,
418            thread_id: ffi::mdbx_tid_t,
419            read_txn_id: u64,
420            gap: std::ffi::c_uint,
421            space: usize,
422            retry: std::ffi::c_int,
423        ) -> HandleSlowReadersReturnCode {
424            if space > MAX_SAFE_READER_SPACE {
425                let message = if is_current_process(process_id as u32) {
426                    "Current process has a long-lived database transaction that grows the database file."
427                } else {
428                    "External process has a long-lived database transaction that grows the database file. \
429                     Use shorter-lived read transactions or shut down the node."
430                };
431                reth_tracing::tracing::warn!(
432                    target: "storage::db::mdbx",
433                    ?process_id,
434                    ?thread_id,
435                    ?read_txn_id,
436                    ?gap,
437                    ?space,
438                    ?retry,
439                    "{message}"
440                )
441            }
442
443            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
444        }
445        inner_env.set_handle_slow_readers(handle_slow_readers);
446
447        inner_env.set_flags(EnvironmentFlags {
448            mode,
449            // We disable readahead because it improves performance for linear scans, but
450            // worsens it for random access (which is our access pattern outside of sync)
451            no_rdahead: true,
452            coalesce: true,
453            exclusive: args.exclusive.unwrap_or_default(),
454            ..Default::default()
455        });
456        // Configure more readers
457        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
458        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
459        // is "pages". Reclaimed list is the list of freed pages that's populated during the
460        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
461        // record with overflow pages. The flow is roughly the following:
462        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
463        //    sequence inside the DB file).
464        // 1. Get some pages from the freelist, put them into the reclaimed list.
465        // 2. Search through the reclaimed list for the sequence of size N.
466        // 3. a. If found, return the sequence.
467        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
468        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
469        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
470        //
471        // Basically, this parameter controls for how long do we search through the freelist before
472        // trying to allocate new pages. Smaller value will make MDBX to fallback to
473        // allocation faster, higher value will force MDBX to search through the freelist
474        // longer until the sequence of pages is found.
475        //
476        // The default value of this parameter is set depending on the DB size. The bigger the
477        // database, the larger is `rp augment limit`.
478        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
479        //
480        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
481        // because we want to prioritize freelist lookup speed over database growth.
482        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
483        inner_env.set_rp_augment_limit(256 * 1024);
484
485        if let Some(log_level) = args.log_level {
486            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
487            let is_log_level_available = if cfg!(debug_assertions) {
488                true
489            } else {
490                matches!(
491                    log_level,
492                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
493                )
494            };
495            if is_log_level_available {
496                inner_env.set_log_level(match log_level {
497                    LogLevel::Fatal => 0,
498                    LogLevel::Error => 1,
499                    LogLevel::Warn => 2,
500                    LogLevel::Notice => 3,
501                    LogLevel::Verbose => 4,
502                    LogLevel::Debug => 5,
503                    LogLevel::Trace => 6,
504                    LogLevel::Extra => 7,
505                });
506            } else {
507                return Err(DatabaseError::LogLevelUnavailable(log_level))
508            }
509        }
510
511        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
512            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
513        }
514
515        let env = Self {
516            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
517            path: path.to_path_buf(),
518            dbis: Arc::default(),
519            metrics: None,
520            _lock_file,
521        };
522
523        Ok(env)
524    }
525
526    /// Enables metrics on the database.
527    pub fn with_metrics(mut self) -> Self {
528        self.metrics = Some(DatabaseEnvMetrics::new().into());
529        self
530    }
531
532    /// Creates all the tables defined in [`Tables`], if necessary.
533    ///
534    /// This keeps tracks of the created table handles and stores them for better efficiency.
535    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
536        self.create_and_track_tables_for::<Tables>()
537    }
538
539    /// Creates all the tables defined in the given [`TableSet`], if necessary.
540    ///
541    /// This keeps tracks of the created table handles and stores them for better efficiency.
542    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
543        let handles = self._create_tables::<TS>()?;
544        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
545        // before it can be shared.
546        let dbis = Arc::make_mut(&mut self.dbis);
547        dbis.extend(handles);
548
549        Ok(())
550    }
551
552    /// Creates all the tables defined in [`Tables`], if necessary.
553    ///
554    /// If this type is unique the created handle for the tables will be updated.
555    ///
556    /// This is recommended to be called during initialization to create and track additional tables
557    /// after the default [`Self::create_tables`] are created.
558    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
559        let handles = self._create_tables::<TS>()?;
560        if let Some(db) = Arc::get_mut(self) {
561            // Note: The db is unique and the dbis as well, and they can also be cloned.
562            let dbis = Arc::make_mut(&mut db.dbis);
563            dbis.extend(handles);
564        }
565        Ok(())
566    }
567
568    /// Creates the tables and returns the identifiers of the tables.
569    fn _create_tables<TS: TableSet>(
570        &self,
571    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
572        let mut handles = Vec::new();
573        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
574
575        for table in TS::tables() {
576            let flags =
577                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
578
579            let db = tx
580                .create_db(Some(table.name()), flags)
581                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
582            handles.push((table.name(), db.dbi()));
583        }
584
585        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
586        Ok(handles)
587    }
588
589    /// Drops an orphaned table by name.
590    ///
591    /// This is used to clean up tables that are no longer defined in the schema but may still
592    /// exist on disk from previous versions.
593    ///
594    /// Returns `Ok(true)` if the table existed and was dropped, `Ok(false)` if the table was not
595    /// found.
596    ///
597    /// # Safety
598    /// This permanently deletes the table and all its data. Only use for tables that are
599    /// confirmed to be obsolete.
600    pub fn drop_orphan_table(&self, name: &str) -> Result<bool, DatabaseError> {
601        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
602
603        match tx.open_db(Some(name)) {
604            Ok(db) => {
605                // SAFETY: We just opened the db handle and will commit immediately after dropping.
606                // No other cursors or handles exist for this table.
607                unsafe {
608                    tx.drop_db(db.dbi()).map_err(|e| DatabaseError::Delete(e.into()))?;
609                }
610                tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
611                Ok(true)
612            }
613            Err(reth_libmdbx::Error::NotFound) => Ok(false),
614            Err(e) => Err(DatabaseError::Open(e.into())),
615        }
616    }
617
618    /// Records version that accesses the database with write privileges.
619    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
620        if version.is_empty() {
621            return Ok(())
622        }
623
624        let tx = self.tx_mut()?;
625        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
626
627        let last_version = version_cursor.last()?.map(|(_, v)| v);
628        if Some(&version) != last_version.as_ref() {
629            version_cursor.upsert(
630                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
631                &version,
632            )?;
633            tx.commit()?;
634        }
635
636        Ok(())
637    }
638}
639
640impl Deref for DatabaseEnv {
641    type Target = Environment;
642
643    fn deref(&self) -> &Self::Target {
644        &self.inner
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::{
652        tables::{
653            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
654        },
655        test_utils::*,
656        AccountChangeSets,
657    };
658    use alloy_consensus::Header;
659    use alloy_primitives::{address, Address, B256, U256};
660    use reth_db_api::{
661        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
662        models::{AccountBeforeTx, IntegerList, ShardedKey},
663        table::{Encode, Table},
664    };
665    use reth_libmdbx::Error;
666    use reth_primitives_traits::{Account, StorageEntry};
667    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
668    use std::str::FromStr;
669    use tempfile::TempDir;
670
671    /// Create database for testing. Returns the `TempDir` to prevent cleanup until test ends.
672    fn create_test_db(kind: DatabaseEnvKind) -> (TempDir, DatabaseEnv) {
673        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
674        let env = create_test_db_with_path(kind, tempdir.path());
675        (tempdir, env)
676    }
677
678    /// Create database for testing with specified path
679    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
680        let mut env =
681            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
682                .expect(ERROR_DB_CREATION);
683        env.create_tables().expect(ERROR_TABLE_CREATION);
684        env
685    }
686
687    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
688    const ERROR_PUT: &str = "Not able to insert value into table.";
689    const ERROR_APPEND: &str = "Not able to append the value to the table.";
690    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
691    const ERROR_GET: &str = "Not able to get value from table.";
692    const ERROR_DEL: &str = "Not able to delete from table.";
693    const ERROR_COMMIT: &str = "Not able to commit transaction.";
694    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
695    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
696    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
697
698    #[test]
699    fn db_creation() {
700        let _tempdir = create_test_db(DatabaseEnvKind::RW);
701    }
702
703    #[test]
704    fn db_drop_orphan_table() {
705        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
706        let db = create_test_db_with_path(DatabaseEnvKind::RW, tempdir.path());
707
708        // Create an orphan table by manually creating it
709        let orphan_table_name = "OrphanTestTable";
710        {
711            let tx = db.inner.begin_rw_txn().expect(ERROR_INIT_TX);
712            tx.create_db(Some(orphan_table_name), DatabaseFlags::empty())
713                .expect("Failed to create orphan table");
714            tx.commit().expect(ERROR_COMMIT);
715        }
716
717        // Verify the table exists by opening it
718        {
719            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
720            assert!(tx.open_db(Some(orphan_table_name)).is_ok(), "Orphan table should exist");
721        }
722
723        // Drop the orphan table
724        let result = db.drop_orphan_table(orphan_table_name);
725        assert!(result.is_ok(), "drop_orphan_table should succeed");
726        assert!(result.unwrap(), "drop_orphan_table should return true for existing table");
727
728        // Verify the table no longer exists
729        {
730            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
731            assert!(
732                tx.open_db(Some(orphan_table_name)).is_err(),
733                "Orphan table should no longer exist"
734            );
735        }
736
737        // Dropping a non-existent table should return Ok(false)
738        let result = db.drop_orphan_table("NonExistentTable");
739        assert!(result.is_ok(), "drop_orphan_table should succeed for non-existent table");
740        assert!(!result.unwrap(), "drop_orphan_table should return false for non-existent table");
741    }
742
743    #[test]
744    fn db_manual_put_get() {
745        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
746
747        let value = Header::default();
748        let key = 1u64;
749
750        // PUT
751        let tx = env.tx_mut().expect(ERROR_INIT_TX);
752        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
753        tx.commit().expect(ERROR_COMMIT);
754
755        // GET
756        let tx = env.tx().expect(ERROR_INIT_TX);
757        let result = tx.get::<Headers>(key).expect(ERROR_GET);
758        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
759        tx.commit().expect(ERROR_COMMIT);
760    }
761
762    #[test]
763    fn db_dup_cursor_delete_first() {
764        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
765        let tx = db.tx_mut().expect(ERROR_INIT_TX);
766
767        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
768
769        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
770        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
771
772        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
773        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
774
775        assert_eq!(
776            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap(),
777            vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),]
778        );
779
780        let mut walker = dup_cursor.walk(None).unwrap();
781        walker.delete_current().expect(ERROR_DEL);
782
783        assert_eq!(walker.next().unwrap().unwrap(), (Address::with_last_byte(1), entry_1));
784
785        // Check the tx view - it correctly holds entry_1
786        assert_eq!(
787            tx.cursor_dup_read::<PlainStorageState>()
788                .unwrap()
789                .walk(None)
790                .unwrap()
791                .collect::<Result<Vec<_>, _>>()
792                .unwrap(),
793            vec![
794                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
795            ]
796        );
797
798        // Check the remainder of walker
799        assert!(walker.next().is_none());
800    }
801
802    #[test]
803    fn db_cursor_walk() {
804        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
805
806        let value = Header::default();
807        let key = 1u64;
808
809        // PUT
810        let tx = env.tx_mut().expect(ERROR_INIT_TX);
811        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
812        tx.commit().expect(ERROR_COMMIT);
813
814        // Cursor
815        let tx = env.tx().expect(ERROR_INIT_TX);
816        let mut cursor = tx.cursor_read::<Headers>().unwrap();
817
818        let first = cursor.first().unwrap();
819        assert!(first.is_some(), "First should be our put");
820
821        // Walk
822        let walk = cursor.walk(Some(key)).unwrap();
823        let first = walk.into_iter().next().unwrap().unwrap();
824        assert_eq!(first.1, value, "First next should be put value");
825    }
826
827    #[test]
828    fn db_cursor_walk_range() {
829        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
830
831        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
832        let tx = db.tx_mut().expect(ERROR_INIT_TX);
833        vec![0, 1, 2, 3]
834            .into_iter()
835            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
836            .expect(ERROR_PUT);
837        tx.commit().expect(ERROR_COMMIT);
838
839        let tx = db.tx().expect(ERROR_INIT_TX);
840        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
841
842        // [1, 3)
843        let mut walker = cursor.walk_range(1..3).unwrap();
844        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
845        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
846        assert!(walker.next().is_none());
847        // next() returns None after walker is done
848        assert!(walker.next().is_none());
849
850        // [1, 2]
851        let mut walker = cursor.walk_range(1..=2).unwrap();
852        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
853        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
854        // next() returns None after walker is done
855        assert!(walker.next().is_none());
856
857        // [1, ∞)
858        let mut walker = cursor.walk_range(1..).unwrap();
859        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
860        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
861        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
862        // next() returns None after walker is done
863        assert!(walker.next().is_none());
864
865        // [2, 4)
866        let mut walker = cursor.walk_range(2..4).unwrap();
867        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
868        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
869        assert!(walker.next().is_none());
870        // next() returns None after walker is done
871        assert!(walker.next().is_none());
872
873        // (∞, 3)
874        let mut walker = cursor.walk_range(..3).unwrap();
875        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
876        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
877        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
878        // next() returns None after walker is done
879        assert!(walker.next().is_none());
880
881        // (∞, ∞)
882        let mut walker = cursor.walk_range(..).unwrap();
883        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
884        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
885        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
886        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
887        // next() returns None after walker is done
888        assert!(walker.next().is_none());
889    }
890
891    #[test]
892    fn db_cursor_walk_range_on_dup_table() {
893        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
894
895        let address0 = Address::ZERO;
896        let address1 = Address::with_last_byte(1);
897        let address2 = Address::with_last_byte(2);
898
899        let tx = db.tx_mut().expect(ERROR_INIT_TX);
900        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
901            .expect(ERROR_PUT);
902        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
903            .expect(ERROR_PUT);
904        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
905            .expect(ERROR_PUT);
906        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
907            .expect(ERROR_PUT);
908        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
909            .expect(ERROR_PUT);
910        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
911            .expect(ERROR_PUT);
912        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
913            .expect(ERROR_PUT);
914        tx.commit().expect(ERROR_COMMIT);
915
916        let tx = db.tx().expect(ERROR_INIT_TX);
917        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
918
919        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
920        assert_eq!(entries.len(), 7);
921
922        let mut walker = cursor.walk_range(0..=1).unwrap();
923        assert_eq!(
924            walker.next().unwrap().unwrap(),
925            (0, AccountBeforeTx { address: address0, info: None })
926        );
927        assert_eq!(
928            walker.next().unwrap().unwrap(),
929            (0, AccountBeforeTx { address: address1, info: None })
930        );
931        assert_eq!(
932            walker.next().unwrap().unwrap(),
933            (0, AccountBeforeTx { address: address2, info: None })
934        );
935        assert_eq!(
936            walker.next().unwrap().unwrap(),
937            (1, AccountBeforeTx { address: address0, info: None })
938        );
939        assert_eq!(
940            walker.next().unwrap().unwrap(),
941            (1, AccountBeforeTx { address: address1, info: None })
942        );
943        assert_eq!(
944            walker.next().unwrap().unwrap(),
945            (1, AccountBeforeTx { address: address2, info: None })
946        );
947        assert!(walker.next().is_none());
948    }
949
950    #[expect(clippy::reversed_empty_ranges)]
951    #[test]
952    fn db_cursor_walk_range_invalid() {
953        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
954
955        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
956        let tx = db.tx_mut().expect(ERROR_INIT_TX);
957        vec![0, 1, 2, 3]
958            .into_iter()
959            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
960            .expect(ERROR_PUT);
961        tx.commit().expect(ERROR_COMMIT);
962
963        let tx = db.tx().expect(ERROR_INIT_TX);
964        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
965
966        // start bound greater than end bound
967        let mut res = cursor.walk_range(3..1).unwrap();
968        assert!(res.next().is_none());
969
970        // start bound greater than end bound
971        let mut res = cursor.walk_range(15..=2).unwrap();
972        assert!(res.next().is_none());
973
974        // returning nothing
975        let mut walker = cursor.walk_range(1..1).unwrap();
976        assert!(walker.next().is_none());
977    }
978
979    #[test]
980    fn db_walker() {
981        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
982
983        // PUT (0, 0), (1, 0), (3, 0)
984        let tx = db.tx_mut().expect(ERROR_INIT_TX);
985        vec![0, 1, 3]
986            .into_iter()
987            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
988            .expect(ERROR_PUT);
989        tx.commit().expect(ERROR_COMMIT);
990
991        let tx = db.tx().expect(ERROR_INIT_TX);
992        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
993
994        let mut walker = Walker::new(&mut cursor, None);
995
996        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
997        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
998        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
999        assert!(walker.next().is_none());
1000
1001        // transform to ReverseWalker
1002        let mut reverse_walker = walker.rev();
1003        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1004        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1005        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1006        assert!(reverse_walker.next().is_none());
1007    }
1008
1009    #[test]
1010    fn db_reverse_walker() {
1011        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1012
1013        // PUT (0, 0), (1, 0), (3, 0)
1014        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1015        vec![0, 1, 3]
1016            .into_iter()
1017            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1018            .expect(ERROR_PUT);
1019        tx.commit().expect(ERROR_COMMIT);
1020
1021        let tx = db.tx().expect(ERROR_INIT_TX);
1022        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1023
1024        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
1025
1026        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1027        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1028        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1029        assert!(reverse_walker.next().is_none());
1030
1031        // transform to Walker
1032        let mut walker = reverse_walker.forward();
1033        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
1034        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
1035        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
1036        assert!(walker.next().is_none());
1037    }
1038
1039    #[test]
1040    fn db_walk_back() {
1041        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1042
1043        // PUT (0, 0), (1, 0), (3, 0)
1044        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1045        vec![0, 1, 3]
1046            .into_iter()
1047            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1048            .expect(ERROR_PUT);
1049        tx.commit().expect(ERROR_COMMIT);
1050
1051        let tx = db.tx().expect(ERROR_INIT_TX);
1052        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1053
1054        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
1055        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1056        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1057        assert!(reverse_walker.next().is_none());
1058
1059        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
1060        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1061        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1062        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1063        assert!(reverse_walker.next().is_none());
1064
1065        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
1066        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1067        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1068        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1069        assert!(reverse_walker.next().is_none());
1070
1071        let mut reverse_walker = cursor.walk_back(None).unwrap();
1072        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1073        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1074        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1075        assert!(reverse_walker.next().is_none());
1076    }
1077
1078    #[test]
1079    fn db_cursor_seek_exact_or_previous_key() {
1080        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1081
1082        // PUT
1083        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1084        vec![0, 1, 3]
1085            .into_iter()
1086            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1087            .expect(ERROR_PUT);
1088        tx.commit().expect(ERROR_COMMIT);
1089
1090        // Cursor
1091        let missing_key = 2;
1092        let tx = db.tx().expect(ERROR_INIT_TX);
1093        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1094        assert!(cursor.current().unwrap().is_none());
1095
1096        // Seek exact
1097        let exact = cursor.seek_exact(missing_key).unwrap();
1098        assert_eq!(exact, None);
1099        assert!(cursor.current().unwrap().is_none());
1100    }
1101
1102    #[test]
1103    fn db_cursor_insert() {
1104        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1105
1106        // PUT
1107        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1108        vec![0, 1, 3, 4, 5]
1109            .into_iter()
1110            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1111            .expect(ERROR_PUT);
1112        tx.commit().expect(ERROR_COMMIT);
1113
1114        let key_to_insert = 2;
1115        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1116        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1117
1118        // INSERT
1119        assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
1120        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1121        // INSERT (failure)
1122        assert!(matches!(
1123        cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
1124        DatabaseError::Write(err) if *err == DatabaseWriteError {
1125            info: Error::KeyExist.into(),
1126            operation: DatabaseWriteOperation::CursorInsert,
1127            table_name: CanonicalHeaders::NAME,
1128            key: key_to_insert.encode().into(),
1129        }));
1130        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1131
1132        tx.commit().expect(ERROR_COMMIT);
1133
1134        // Confirm the result
1135        let tx = db.tx().expect(ERROR_INIT_TX);
1136        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1137        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1138        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1139        tx.commit().expect(ERROR_COMMIT);
1140    }
1141
1142    #[test]
1143    fn db_cursor_insert_dup() {
1144        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1145        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1146
1147        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1148        let key = Address::random();
1149        let subkey1 = B256::random();
1150        let subkey2 = B256::random();
1151
1152        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1153        assert!(dup_cursor.insert(key, &entry1).is_ok());
1154
1155        // Can't insert
1156        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1157        assert!(dup_cursor.insert(key, &entry2).is_err());
1158    }
1159
1160    #[test]
1161    fn db_cursor_delete_current_non_existent() {
1162        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1163        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1164
1165        let key1 = Address::with_last_byte(1);
1166        let key2 = Address::with_last_byte(2);
1167        let key3 = Address::with_last_byte(3);
1168        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1169
1170        assert!(cursor.insert(key1, &Account::default()).is_ok());
1171        assert!(cursor.insert(key2, &Account::default()).is_ok());
1172        assert!(cursor.insert(key3, &Account::default()).is_ok());
1173
1174        // Seek & delete key2
1175        cursor.seek_exact(key2).unwrap();
1176        assert!(cursor.delete_current().is_ok());
1177        assert!(cursor.seek_exact(key2).unwrap().is_none());
1178
1179        // Seek & delete key2 again
1180        assert!(cursor.seek_exact(key2).unwrap().is_none());
1181        assert!(matches!(
1182            cursor.delete_current().unwrap_err(),
1183            DatabaseError::Delete(err) if err == reth_libmdbx::Error::NoData.into()));
1184        // Assert that key1 is still there
1185        assert_eq!(cursor.seek_exact(key1).unwrap(), Some((key1, Account::default())));
1186        // Assert that key3 is still there
1187        assert_eq!(cursor.seek_exact(key3).unwrap(), Some((key3, Account::default())));
1188    }
1189
1190    #[test]
1191    fn db_cursor_insert_wherever_cursor_is() {
1192        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1193        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1194
1195        // PUT
1196        vec![0, 1, 3, 5, 7, 9]
1197            .into_iter()
1198            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1199            .expect(ERROR_PUT);
1200        tx.commit().expect(ERROR_COMMIT);
1201
1202        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1203        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1204
1205        // INSERT (cursor starts at last)
1206        cursor.last().unwrap();
1207        assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
1208
1209        for pos in (2..=8).step_by(2) {
1210            assert!(cursor.insert(pos, &B256::ZERO).is_ok());
1211            assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
1212        }
1213        tx.commit().expect(ERROR_COMMIT);
1214
1215        // Confirm the result
1216        let tx = db.tx().expect(ERROR_INIT_TX);
1217        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1218        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1219        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1220        tx.commit().expect(ERROR_COMMIT);
1221    }
1222
1223    #[test]
1224    fn db_cursor_append() {
1225        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1226
1227        // PUT
1228        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1229        vec![0, 1, 2, 3, 4]
1230            .into_iter()
1231            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1232            .expect(ERROR_PUT);
1233        tx.commit().expect(ERROR_COMMIT);
1234
1235        // APPEND
1236        let key_to_append = 5;
1237        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1238        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1239        assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
1240        tx.commit().expect(ERROR_COMMIT);
1241
1242        // Confirm the result
1243        let tx = db.tx().expect(ERROR_INIT_TX);
1244        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1245        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1246        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1247        tx.commit().expect(ERROR_COMMIT);
1248    }
1249
1250    #[test]
1251    fn db_cursor_append_failure() {
1252        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1253
1254        // PUT
1255        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1256        vec![0, 1, 3, 4, 5]
1257            .into_iter()
1258            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1259            .expect(ERROR_PUT);
1260        tx.commit().expect(ERROR_COMMIT);
1261
1262        // APPEND
1263        let key_to_append = 2;
1264        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1265        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1266        assert!(matches!(
1267        cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
1268        DatabaseError::Write(err) if *err == DatabaseWriteError {
1269            info: Error::KeyMismatch.into(),
1270            operation: DatabaseWriteOperation::CursorAppend,
1271            table_name: CanonicalHeaders::NAME,
1272            key: key_to_append.encode().into(),
1273        }));
1274        assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
1275        tx.commit().expect(ERROR_COMMIT);
1276
1277        // Confirm the result
1278        let tx = db.tx().expect(ERROR_INIT_TX);
1279        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1280        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1281        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1282        tx.commit().expect(ERROR_COMMIT);
1283    }
1284
1285    #[test]
1286    fn db_cursor_upsert() {
1287        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1288        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1289
1290        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1291        let key = Address::random();
1292
1293        let account = Account::default();
1294        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1295        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1296
1297        let account = Account { nonce: 1, ..Default::default() };
1298        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1299        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1300
1301        let account = Account { nonce: 2, ..Default::default() };
1302        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1303        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1304
1305        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1306        let subkey = B256::random();
1307
1308        let value = U256::from(1);
1309        let entry1 = StorageEntry { key: subkey, value };
1310        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1311        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1312
1313        let value = U256::from(2);
1314        let entry2 = StorageEntry { key: subkey, value };
1315        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1316        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1317        assert_eq!(dup_cursor.next_dup_val().unwrap(), Some(entry2));
1318    }
1319
1320    #[test]
1321    fn db_cursor_dupsort_append() {
1322        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1323
1324        let transition_id = 2;
1325
1326        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1327        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1328        vec![0, 1, 3, 4, 5]
1329            .into_iter()
1330            .try_for_each(|val| {
1331                cursor.append(
1332                    transition_id,
1333                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1334                )
1335            })
1336            .expect(ERROR_APPEND);
1337        tx.commit().expect(ERROR_COMMIT);
1338
1339        // APPEND DUP & APPEND
1340        let subkey_to_append = 2;
1341        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1342        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1343        assert!(matches!(
1344        cursor
1345            .append_dup(
1346                transition_id,
1347                AccountBeforeTx {
1348                    address: Address::with_last_byte(subkey_to_append),
1349                    info: None
1350                }
1351            )
1352            .unwrap_err(),
1353        DatabaseError::Write(err) if *err == DatabaseWriteError {
1354            info: Error::KeyMismatch.into(),
1355            operation: DatabaseWriteOperation::CursorAppendDup,
1356            table_name: AccountChangeSets::NAME,
1357            key: transition_id.encode().into(),
1358        }));
1359        assert!(matches!(
1360            cursor
1361                .append(
1362                    transition_id - 1,
1363                    &AccountBeforeTx {
1364                        address: Address::with_last_byte(subkey_to_append),
1365                        info: None
1366                    }
1367                )
1368                .unwrap_err(),
1369            DatabaseError::Write(err) if *err == DatabaseWriteError {
1370                info: Error::KeyMismatch.into(),
1371                operation: DatabaseWriteOperation::CursorAppend,
1372                table_name: AccountChangeSets::NAME,
1373                key: (transition_id - 1).encode().into(),
1374            }
1375        ));
1376        assert!(cursor
1377            .append(
1378                transition_id,
1379                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1380            )
1381            .is_ok());
1382    }
1383
1384    #[test]
1385    fn db_closure_put_get() {
1386        let tempdir = TempDir::new().expect(ERROR_TEMPDIR);
1387        let path = tempdir.path();
1388
1389        let value = Account {
1390            nonce: 18446744073709551615,
1391            bytecode_hash: Some(B256::random()),
1392            balance: U256::MAX,
1393        };
1394        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1395            .expect(ERROR_ETH_ADDRESS);
1396
1397        {
1398            let env = create_test_db_with_path(DatabaseEnvKind::RW, path);
1399
1400            // PUT
1401            let result = env.update(|tx| {
1402                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1403                200
1404            });
1405            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1406        }
1407
1408        let env = DatabaseEnv::open(
1409            path,
1410            DatabaseEnvKind::RO,
1411            DatabaseArguments::new(ClientVersion::default()),
1412        )
1413        .expect(ERROR_DB_CREATION);
1414
1415        // GET
1416        let result =
1417            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1418
1419        assert_eq!(result, Some(value))
1420    }
1421
1422    #[test]
1423    fn db_dup_sort() {
1424        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1425        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1426            .expect(ERROR_ETH_ADDRESS);
1427
1428        // PUT (0,0)
1429        let value00 = StorageEntry::default();
1430        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1431
1432        // PUT (2,2)
1433        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1434        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1435
1436        // PUT (1,1)
1437        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1438        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1439
1440        // Iterate with cursor
1441        {
1442            let tx = env.tx().expect(ERROR_INIT_TX);
1443            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1444
1445            // Notice that value11 and value22 have been ordered in the DB.
1446            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1447            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1448            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1449        }
1450
1451        // Seek value with exact subkey
1452        {
1453            let tx = env.tx().expect(ERROR_INIT_TX);
1454            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1455            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1456            assert_eq!(
1457                (key, value11),
1458                walker
1459                    .next()
1460                    .expect("element should exist.")
1461                    .expect("should be able to retrieve it.")
1462            );
1463        }
1464    }
1465
1466    #[test]
1467    fn db_walk_dup_with_not_existing_key() {
1468        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1469        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1470            .expect(ERROR_ETH_ADDRESS);
1471
1472        // PUT (0,0)
1473        let value00 = StorageEntry::default();
1474        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1475
1476        // PUT (2,2)
1477        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1478        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1479
1480        // PUT (1,1)
1481        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1482        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1483
1484        // Try to walk_dup with not existing key should immediately return None
1485        {
1486            let tx = env.tx().expect(ERROR_INIT_TX);
1487            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1488            let not_existing_key = Address::ZERO;
1489            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1490            assert!(walker.next().is_none());
1491        }
1492    }
1493
1494    #[test]
1495    fn db_iterate_over_all_dup_values() {
1496        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1497        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1498            .expect(ERROR_ETH_ADDRESS);
1499        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1500            .expect(ERROR_ETH_ADDRESS);
1501
1502        // PUT key1 (0,0)
1503        let value00 = StorageEntry::default();
1504        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1505
1506        // PUT key1 (1,1)
1507        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1508        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1509
1510        // PUT key2 (2,2)
1511        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1512        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1513
1514        // Iterate with walk_dup
1515        {
1516            let tx = env.tx().expect(ERROR_INIT_TX);
1517            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1518            let mut walker = cursor.walk_dup(None, None).unwrap();
1519
1520            // Notice that value11 and value22 have been ordered in the DB.
1521            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1522            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1523            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1524            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1525            assert!(walker.next().is_none());
1526        }
1527
1528        // Iterate by using `walk`
1529        {
1530            let tx = env.tx().expect(ERROR_INIT_TX);
1531            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1532            let first = cursor.first().unwrap().unwrap();
1533            let mut walker = cursor.walk(Some(first.0)).unwrap();
1534            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1535            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1536            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1537        }
1538    }
1539
1540    #[test]
1541    fn dup_value_with_same_subkey() {
1542        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1543        let key1 = Address::new([0x11; 20]);
1544        let key2 = Address::new([0x22; 20]);
1545
1546        // PUT key1 (0,1)
1547        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1548        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1549
1550        // PUT key1 (0,0)
1551        let value00 = StorageEntry::default();
1552        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1553
1554        // PUT key2 (2,2)
1555        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1556        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1557
1558        // Iterate with walk
1559        {
1560            let tx = env.tx().expect(ERROR_INIT_TX);
1561            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1562            let first = cursor.first().unwrap().unwrap();
1563            let mut walker = cursor.walk(Some(first.0)).unwrap();
1564
1565            // NOTE: Both values are present
1566            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1567            assert_eq!((key1, value01), walker.next().unwrap().unwrap());
1568            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1569        }
1570
1571        // seek_by_key_subkey
1572        {
1573            let tx = env.tx().expect(ERROR_INIT_TX);
1574            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1575
1576            // NOTE: There are two values with same SubKey but only first one is shown
1577            assert_eq!(value00, cursor.seek_by_key_subkey(key1, value00.key).unwrap().unwrap());
1578            // key1 but value is greater than the one in the DB
1579            assert_eq!(None, cursor.seek_by_key_subkey(key1, value22.key).unwrap());
1580        }
1581    }
1582
1583    #[test]
1584    fn db_sharded_key() {
1585        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1586        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1587
1588        let shards = 5;
1589        for i in 1..=shards {
1590            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1591            let list = IntegerList::new_pre_sorted([i * 100u64]);
1592
1593            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1594                .unwrap();
1595        }
1596
1597        // Seek value with non existing key.
1598        {
1599            let tx = db.tx().expect(ERROR_INIT_TX);
1600            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1601
1602            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1603            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1604            // `Address | 200`.
1605            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1606            let (key, list) = walker
1607                .next()
1608                .expect("element should exist.")
1609                .expect("should be able to retrieve it.");
1610
1611            assert_eq!(ShardedKey::new(real_key, 200), key);
1612            let list200 = IntegerList::new_pre_sorted([200u64]);
1613            assert_eq!(list200, list);
1614        }
1615        // Seek greatest index
1616        {
1617            let tx = db.tx().expect(ERROR_INIT_TX);
1618            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1619
1620            // It will seek the MAX value of transition index and try to use prev to get first
1621            // biggers.
1622            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1623            let (key, list) = cursor
1624                .prev()
1625                .expect("element should exist.")
1626                .expect("should be able to retrieve it.");
1627
1628            assert_eq!(ShardedKey::new(real_key, 400), key);
1629            let list400 = IntegerList::new_pre_sorted([400u64]);
1630            assert_eq!(list400, list);
1631        }
1632    }
1633}