Skip to main content

reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::{Path, PathBuf},
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106    /// Defines the synchronization strategy used by the MDBX database when writing data to disk.
107    ///
108    /// This determines how aggressively MDBX ensures data durability versus prioritizing
109    /// performance. The available modes are:
110    ///
111    /// - [`SyncMode::Durable`]: Ensures all transactions are fully flushed to disk before they are
112    ///   considered committed.   This provides the highest level of durability and crash safety
113    ///   but may have a performance cost.
114    /// - [`SyncMode::SafeNoSync`]: Skips certain fsync operations to improve write performance.
115    ///   This mode still maintains database integrity but may lose the most recent transactions if
116    ///   the system crashes unexpectedly.
117    ///
118    /// Choose `Durable` if consistency and crash safety are critical (e.g., production
119    /// environments). Choose `SafeNoSync` if performance is more important and occasional data
120    /// loss is acceptable (e.g., testing or ephemeral data).
121    sync_mode: SyncMode,
122}
123
124impl Default for DatabaseArguments {
125    fn default() -> Self {
126        Self::new(ClientVersion::default())
127    }
128}
129
130impl DatabaseArguments {
131    /// Create new database arguments with given client version.
132    pub fn new(client_version: ClientVersion) -> Self {
133        Self {
134            client_version,
135            geometry: Geometry {
136                size: Some(0..(8 * TERABYTE)),
137                growth_step: Some(4 * GIGABYTE as isize),
138                shrink_threshold: Some(0),
139                page_size: Some(PageSize::Set(default_page_size())),
140            },
141            log_level: None,
142            max_read_transaction_duration: None,
143            exclusive: None,
144            max_readers: None,
145            sync_mode: SyncMode::Durable,
146        }
147    }
148
149    /// Create database arguments suitable for testing.
150    ///
151    /// Uses a small geometry (64MB max, 4MB growth) to avoid exhausting the system's
152    /// virtual memory map limit (`vm.max_map_count`) when many test databases are open
153    /// concurrently.
154    pub fn test() -> Self {
155        Self {
156            geometry: Geometry {
157                size: Some(0..(64 * MEGABYTE)),
158                growth_step: Some(4 * MEGABYTE as isize),
159                shrink_threshold: Some(0),
160                page_size: Some(PageSize::Set(default_page_size())),
161            },
162            max_read_transaction_duration: Some(MaxReadTransactionDuration::Unbounded),
163            ..Self::new(ClientVersion::default())
164        }
165    }
166
167    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
168    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
169        if let Some(max_size) = max_size {
170            self.geometry.size = Some(0..max_size);
171        }
172        self
173    }
174
175    /// Sets the database page size value.
176    pub const fn with_geometry_page_size(mut self, page_size: Option<usize>) -> Self {
177        if let Some(size) = page_size {
178            self.geometry.page_size = Some(reth_libmdbx::PageSize::Set(size));
179        }
180
181        self
182    }
183
184    /// Sets the database sync mode.
185    pub const fn with_sync_mode(mut self, sync_mode: Option<SyncMode>) -> Self {
186        if let Some(sync_mode) = sync_mode {
187            self.sync_mode = sync_mode;
188        }
189
190        self
191    }
192
193    /// Configures the database growth step in bytes.
194    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
195        if let Some(growth_step) = growth_step {
196            self.geometry.growth_step = Some(growth_step as isize);
197        }
198        self
199    }
200
201    /// Set the log level.
202    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
203        self.log_level = log_level;
204        self
205    }
206
207    /// Set the maximum duration of a read transaction.
208    pub const fn max_read_transaction_duration(
209        &mut self,
210        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
211    ) {
212        self.max_read_transaction_duration = max_read_transaction_duration;
213    }
214
215    /// Set the maximum duration of a read transaction.
216    pub const fn with_max_read_transaction_duration(
217        mut self,
218        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
219    ) -> Self {
220        self.max_read_transaction_duration(max_read_transaction_duration);
221        self
222    }
223
224    /// Set the mdbx exclusive flag.
225    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
226        self.exclusive = exclusive;
227        self
228    }
229
230    /// Set `max_readers` flag.
231    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
232        self.max_readers = max_readers;
233        self
234    }
235
236    /// Returns the client version if any.
237    pub const fn client_version(&self) -> &ClientVersion {
238        &self.client_version
239    }
240}
241
242/// Wrapper for the libmdbx environment: [Environment]
243#[derive(Debug, Clone)]
244pub struct DatabaseEnv {
245    /// Libmdbx-sys environment.
246    inner: Environment,
247    /// Path to the database directory.
248    path: PathBuf,
249    /// Opened DBIs for reuse.
250    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
251    /// More generally, do not dynamically create, re-open, or drop tables at
252    /// runtime. It's better to perform table creation and migration only once
253    /// at startup.
254    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
255    /// Cache for metric handles. If `None`, metrics are not recorded.
256    metrics: Option<Arc<DatabaseEnvMetrics>>,
257    /// Write lock for when dealing with a read-write environment.
258    _lock_file: Option<StorageLock>,
259}
260
261impl Database for DatabaseEnv {
262    type TX = tx::Tx<RO>;
263    type TXMut = tx::Tx<RW>;
264
265    fn tx(&self) -> Result<Self::TX, DatabaseError> {
266        Tx::new(
267            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
268            self.dbis.clone(),
269            self.metrics.clone(),
270        )
271        .map_err(|e| DatabaseError::InitTx(e.into()))
272    }
273
274    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
275        Tx::new(
276            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
277            self.dbis.clone(),
278            self.metrics.clone(),
279        )
280        .map_err(|e| DatabaseError::InitTx(e.into()))
281    }
282
283    fn path(&self) -> PathBuf {
284        self.path.clone()
285    }
286
287    fn oldest_reader_txnid(&self) -> Option<u64> {
288        let info = self.inner.info().ok()?;
289        let txnid = info.latter_reader_txnid();
290        if txnid == 0 {
291            None
292        } else {
293            Some(txnid)
294        }
295    }
296
297    fn last_txnid(&self) -> Option<u64> {
298        let info = self.inner.info().ok()?;
299        let txnid = info.last_txnid();
300        if txnid == 0 {
301            None
302        } else {
303            Some(txnid as u64)
304        }
305    }
306}
307
308impl DatabaseMetrics for DatabaseEnv {
309    fn report_metrics(&self) {
310        for (name, value, labels) in self.gauge_metrics() {
311            gauge!(name, labels).set(value);
312        }
313    }
314
315    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
316        let mut metrics = Vec::new();
317
318        let _ = self
319            .view(|tx| {
320                for table in Tables::ALL.iter().map(Tables::name) {
321                    let table_db =
322                        tx.inner().open_db(Some(table)).wrap_err("Could not open db.")?;
323
324                    let stats = tx
325                        .inner()
326                        .db_stat(table_db.dbi())
327                        .wrap_err(format!("Could not find table: {table}"))?;
328
329                    let page_size = stats.page_size() as usize;
330                    let leaf_pages = stats.leaf_pages();
331                    let branch_pages = stats.branch_pages();
332                    let overflow_pages = stats.overflow_pages();
333                    let num_pages = leaf_pages + branch_pages + overflow_pages;
334                    let table_size = page_size * num_pages;
335                    let entries = stats.entries();
336
337                    metrics.push((
338                        "db.table_size",
339                        table_size as f64,
340                        vec![Label::new("table", table)],
341                    ));
342                    metrics.push((
343                        "db.table_pages",
344                        leaf_pages as f64,
345                        vec![Label::new("table", table), Label::new("type", "leaf")],
346                    ));
347                    metrics.push((
348                        "db.table_pages",
349                        branch_pages as f64,
350                        vec![Label::new("table", table), Label::new("type", "branch")],
351                    ));
352                    metrics.push((
353                        "db.table_pages",
354                        overflow_pages as f64,
355                        vec![Label::new("table", table), Label::new("type", "overflow")],
356                    ));
357                    metrics.push((
358                        "db.table_entries",
359                        entries as f64,
360                        vec![Label::new("table", table)],
361                    ));
362                }
363
364                Ok::<(), eyre::Report>(())
365            })
366            .map_err(|error| error!(%error, "Failed to read db table stats"));
367
368        if let Ok(freelist) =
369            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
370        {
371            metrics.push(("db.freelist", freelist as f64, vec![]));
372        }
373
374        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
375            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
376        }
377
378        metrics.push((
379            "db.timed_out_not_aborted_transactions",
380            self.timed_out_not_aborted_transactions() as f64,
381            vec![],
382        ));
383
384        metrics
385    }
386}
387
388impl DatabaseEnv {
389    /// Opens the database at the specified path with the given `EnvKind`.
390    ///
391    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
392    pub fn open(
393        path: &Path,
394        kind: DatabaseEnvKind,
395        args: DatabaseArguments,
396    ) -> Result<Self, DatabaseError> {
397        let _lock_file = if kind.is_rw() {
398            StorageLock::try_acquire(path)
399                .map_err(|err| DatabaseError::Other(err.to_string()))?
400                .into()
401        } else {
402            None
403        };
404
405        let mut inner_env = Environment::builder();
406
407        let mode = match kind {
408            DatabaseEnvKind::RO => Mode::ReadOnly,
409            DatabaseEnvKind::RW => {
410                // enable writemap mode in RW mode
411                inner_env.write_map();
412                Mode::ReadWrite { sync_mode: args.sync_mode }
413            }
414        };
415
416        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
417        // environment creation.
418        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
419        inner_env.set_max_dbs(256);
420        inner_env.set_geometry(args.geometry);
421
422        fn is_current_process(id: u32) -> bool {
423            #[cfg(unix)]
424            {
425                id == std::os::unix::process::parent_id() || id == std::process::id()
426            }
427
428            #[cfg(not(unix))]
429            {
430                id == std::process::id()
431            }
432        }
433
434        extern "C" fn handle_slow_readers(
435            _env: *const ffi::MDBX_env,
436            _txn: *const ffi::MDBX_txn,
437            process_id: ffi::mdbx_pid_t,
438            thread_id: ffi::mdbx_tid_t,
439            read_txn_id: u64,
440            gap: std::ffi::c_uint,
441            space: usize,
442            retry: std::ffi::c_int,
443        ) -> HandleSlowReadersReturnCode {
444            if space > MAX_SAFE_READER_SPACE {
445                let message = if is_current_process(process_id as u32) {
446                    "Current process has a long-lived database transaction that grows the database file."
447                } else {
448                    "External process has a long-lived database transaction that grows the database file. \
449                     Use shorter-lived read transactions or shut down the node."
450                };
451                reth_tracing::tracing::warn!(
452                    target: "storage::db::mdbx",
453                    ?process_id,
454                    ?thread_id,
455                    ?read_txn_id,
456                    ?gap,
457                    ?space,
458                    ?retry,
459                    "{message}"
460                )
461            }
462
463            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
464        }
465        inner_env.set_handle_slow_readers(handle_slow_readers);
466
467        inner_env.set_flags(EnvironmentFlags {
468            mode,
469            // We disable readahead because it improves performance for linear scans, but
470            // worsens it for random access (which is our access pattern outside of sync)
471            no_rdahead: true,
472            coalesce: true,
473            exclusive: args.exclusive.unwrap_or_default(),
474            ..Default::default()
475        });
476        // Configure more readers
477        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
478        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
479        // is "pages". Reclaimed list is the list of freed pages that's populated during the
480        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
481        // record with overflow pages. The flow is roughly the following:
482        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
483        //    sequence inside the DB file).
484        // 1. Get some pages from the freelist, put them into the reclaimed list.
485        // 2. Search through the reclaimed list for the sequence of size N.
486        // 3. a. If found, return the sequence.
487        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
488        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
489        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
490        //
491        // Basically, this parameter controls for how long do we search through the freelist before
492        // trying to allocate new pages. Smaller value will make MDBX to fallback to
493        // allocation faster, higher value will force MDBX to search through the freelist
494        // longer until the sequence of pages is found.
495        //
496        // The default value of this parameter is set depending on the DB size. The bigger the
497        // database, the larger is `rp augment limit`.
498        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
499        //
500        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
501        // because we want to prioritize freelist lookup speed over database growth.
502        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
503        inner_env.set_rp_augment_limit(256 * 1024);
504
505        if let Some(log_level) = args.log_level {
506            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
507            let is_log_level_available = if cfg!(debug_assertions) {
508                true
509            } else {
510                matches!(
511                    log_level,
512                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
513                )
514            };
515            if is_log_level_available {
516                inner_env.set_log_level(match log_level {
517                    LogLevel::Fatal => 0,
518                    LogLevel::Error => 1,
519                    LogLevel::Warn => 2,
520                    LogLevel::Notice => 3,
521                    LogLevel::Verbose => 4,
522                    LogLevel::Debug => 5,
523                    LogLevel::Trace => 6,
524                    LogLevel::Extra => 7,
525                });
526            } else {
527                return Err(DatabaseError::LogLevelUnavailable(log_level))
528            }
529        }
530
531        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
532            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
533        }
534
535        let env = Self {
536            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
537            path: path.to_path_buf(),
538            dbis: Arc::default(),
539            metrics: None,
540            _lock_file,
541        };
542
543        Ok(env)
544    }
545
546    /// Enables metrics on the database.
547    pub fn with_metrics(mut self) -> Self {
548        self.metrics = Some(DatabaseEnvMetrics::new().into());
549        self
550    }
551
552    /// Creates all the tables defined in [`Tables`], if necessary.
553    ///
554    /// This keeps tracks of the created table handles and stores them for better efficiency.
555    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
556        self.create_and_track_tables_for::<Tables>()
557    }
558
559    /// Creates all the tables defined in the given [`TableSet`], if necessary.
560    ///
561    /// This keeps tracks of the created table handles and stores them for better efficiency.
562    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
563        let handles = self._create_tables::<TS>()?;
564        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
565        // before it can be shared.
566        let dbis = Arc::make_mut(&mut self.dbis);
567        dbis.extend(handles);
568
569        Ok(())
570    }
571
572    /// Creates all the tables defined in [`Tables`], if necessary.
573    ///
574    /// If this type is unique the created handle for the tables will be updated.
575    ///
576    /// This is recommended to be called during initialization to create and track additional tables
577    /// after the default [`Self::create_tables`] are created.
578    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
579        let handles = self._create_tables::<TS>()?;
580        if let Some(db) = Arc::get_mut(self) {
581            // Note: The db is unique and the dbis as well, and they can also be cloned.
582            let dbis = Arc::make_mut(&mut db.dbis);
583            dbis.extend(handles);
584        }
585        Ok(())
586    }
587
588    /// Creates the tables and returns the identifiers of the tables.
589    fn _create_tables<TS: TableSet>(
590        &self,
591    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
592        let mut handles = Vec::new();
593        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
594
595        for table in TS::tables() {
596            let flags =
597                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
598
599            let db = tx
600                .create_db(Some(table.name()), flags)
601                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
602            handles.push((table.name(), db.dbi()));
603        }
604
605        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
606        Ok(handles)
607    }
608
609    /// Drops an orphaned table by name.
610    ///
611    /// This is used to clean up tables that are no longer defined in the schema but may still
612    /// exist on disk from previous versions.
613    ///
614    /// Returns `Ok(true)` if the table existed and was dropped, `Ok(false)` if the table was not
615    /// found.
616    ///
617    /// # Safety
618    /// This permanently deletes the table and all its data. Only use for tables that are
619    /// confirmed to be obsolete.
620    pub fn drop_orphan_table(&self, name: &str) -> Result<bool, DatabaseError> {
621        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
622
623        match tx.open_db(Some(name)) {
624            Ok(db) => {
625                // SAFETY: We just opened the db handle and will commit immediately after dropping.
626                // No other cursors or handles exist for this table.
627                unsafe {
628                    tx.drop_db(db.dbi()).map_err(|e| DatabaseError::Delete(e.into()))?;
629                }
630                tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
631                Ok(true)
632            }
633            Err(reth_libmdbx::Error::NotFound) => Ok(false),
634            Err(e) => Err(DatabaseError::Open(e.into())),
635        }
636    }
637
638    /// Records version that accesses the database with write privileges.
639    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
640        if version.is_empty() {
641            return Ok(())
642        }
643
644        let tx = self.tx_mut()?;
645        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
646
647        let last_version = version_cursor.last()?.map(|(_, v)| v);
648        if Some(&version) != last_version.as_ref() {
649            version_cursor.upsert(
650                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
651                &version,
652            )?;
653            tx.commit()?;
654        }
655
656        Ok(())
657    }
658}
659
660impl Deref for DatabaseEnv {
661    type Target = Environment;
662
663    fn deref(&self) -> &Self::Target {
664        &self.inner
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use crate::{
672        tables::{
673            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
674        },
675        test_utils::*,
676        AccountChangeSets,
677    };
678    use alloy_consensus::Header;
679    use alloy_primitives::{address, Address, B256, U256};
680    use reth_db_api::{
681        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
682        models::{AccountBeforeTx, IntegerList, ShardedKey},
683        table::{Encode, Table},
684    };
685    use reth_libmdbx::Error;
686    use reth_primitives_traits::{Account, StorageEntry};
687    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
688    use std::str::FromStr;
689    use tempfile::TempDir;
690
691    /// Create database for testing. Returns the `TempDir` to prevent cleanup until test ends.
692    fn create_test_db(kind: DatabaseEnvKind) -> (TempDir, DatabaseEnv) {
693        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
694        let env = create_test_db_with_path(kind, tempdir.path());
695        (tempdir, env)
696    }
697
698    /// Create database for testing with specified path
699    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
700        let mut env =
701            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
702                .expect(ERROR_DB_CREATION);
703        env.create_tables().expect(ERROR_TABLE_CREATION);
704        env
705    }
706
707    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
708    const ERROR_PUT: &str = "Not able to insert value into table.";
709    const ERROR_APPEND: &str = "Not able to append the value to the table.";
710    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
711    const ERROR_GET: &str = "Not able to get value from table.";
712    const ERROR_DEL: &str = "Not able to delete from table.";
713    const ERROR_COMMIT: &str = "Not able to commit transaction.";
714    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
715    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
716    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
717
718    #[test]
719    fn db_creation() {
720        let _tempdir = create_test_db(DatabaseEnvKind::RW);
721    }
722
723    #[test]
724    fn db_drop_orphan_table() {
725        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
726        let db = create_test_db_with_path(DatabaseEnvKind::RW, tempdir.path());
727
728        // Create an orphan table by manually creating it
729        let orphan_table_name = "OrphanTestTable";
730        {
731            let tx = db.inner.begin_rw_txn().expect(ERROR_INIT_TX);
732            tx.create_db(Some(orphan_table_name), DatabaseFlags::empty())
733                .expect("Failed to create orphan table");
734            tx.commit().expect(ERROR_COMMIT);
735        }
736
737        // Verify the table exists by opening it
738        {
739            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
740            assert!(tx.open_db(Some(orphan_table_name)).is_ok(), "Orphan table should exist");
741        }
742
743        // Drop the orphan table
744        let result = db.drop_orphan_table(orphan_table_name);
745        assert!(result.is_ok(), "drop_orphan_table should succeed");
746        assert!(result.unwrap(), "drop_orphan_table should return true for existing table");
747
748        // Verify the table no longer exists
749        {
750            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
751            assert!(
752                tx.open_db(Some(orphan_table_name)).is_err(),
753                "Orphan table should no longer exist"
754            );
755        }
756
757        // Dropping a non-existent table should return Ok(false)
758        let result = db.drop_orphan_table("NonExistentTable");
759        assert!(result.is_ok(), "drop_orphan_table should succeed for non-existent table");
760        assert!(!result.unwrap(), "drop_orphan_table should return false for non-existent table");
761    }
762
763    #[test]
764    fn db_manual_put_get() {
765        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
766
767        let value = Header::default();
768        let key = 1u64;
769
770        // PUT
771        let tx = env.tx_mut().expect(ERROR_INIT_TX);
772        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
773        tx.commit().expect(ERROR_COMMIT);
774
775        // GET
776        let tx = env.tx().expect(ERROR_INIT_TX);
777        let result = tx.get::<Headers>(key).expect(ERROR_GET);
778        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
779        tx.commit().expect(ERROR_COMMIT);
780    }
781
782    #[test]
783    fn db_dup_cursor_delete_first() {
784        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
785        let tx = db.tx_mut().expect(ERROR_INIT_TX);
786
787        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
788
789        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
790        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
791
792        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
793        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
794
795        assert_eq!(
796            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap(),
797            vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),]
798        );
799
800        let mut walker = dup_cursor.walk(None).unwrap();
801        walker.delete_current().expect(ERROR_DEL);
802
803        assert_eq!(walker.next().unwrap().unwrap(), (Address::with_last_byte(1), entry_1));
804
805        // Check the tx view - it correctly holds entry_1
806        assert_eq!(
807            tx.cursor_dup_read::<PlainStorageState>()
808                .unwrap()
809                .walk(None)
810                .unwrap()
811                .collect::<Result<Vec<_>, _>>()
812                .unwrap(),
813            vec![
814                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
815            ]
816        );
817
818        // Check the remainder of walker
819        assert!(walker.next().is_none());
820    }
821
822    #[test]
823    fn db_cursor_walk() {
824        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
825
826        let value = Header::default();
827        let key = 1u64;
828
829        // PUT
830        let tx = env.tx_mut().expect(ERROR_INIT_TX);
831        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
832        tx.commit().expect(ERROR_COMMIT);
833
834        // Cursor
835        let tx = env.tx().expect(ERROR_INIT_TX);
836        let mut cursor = tx.cursor_read::<Headers>().unwrap();
837
838        let first = cursor.first().unwrap();
839        assert!(first.is_some(), "First should be our put");
840
841        // Walk
842        let walk = cursor.walk(Some(key)).unwrap();
843        let first = walk.into_iter().next().unwrap().unwrap();
844        assert_eq!(first.1, value, "First next should be put value");
845    }
846
847    #[test]
848    fn db_cursor_walk_range() {
849        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
850
851        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
852        let tx = db.tx_mut().expect(ERROR_INIT_TX);
853        vec![0, 1, 2, 3]
854            .into_iter()
855            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
856            .expect(ERROR_PUT);
857        tx.commit().expect(ERROR_COMMIT);
858
859        let tx = db.tx().expect(ERROR_INIT_TX);
860        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
861
862        // [1, 3)
863        let mut walker = cursor.walk_range(1..3).unwrap();
864        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
865        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
866        assert!(walker.next().is_none());
867        // next() returns None after walker is done
868        assert!(walker.next().is_none());
869
870        // [1, 2]
871        let mut walker = cursor.walk_range(1..=2).unwrap();
872        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
873        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
874        // next() returns None after walker is done
875        assert!(walker.next().is_none());
876
877        // [1, ∞)
878        let mut walker = cursor.walk_range(1..).unwrap();
879        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
880        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
881        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
882        // next() returns None after walker is done
883        assert!(walker.next().is_none());
884
885        // [2, 4)
886        let mut walker = cursor.walk_range(2..4).unwrap();
887        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
888        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
889        assert!(walker.next().is_none());
890        // next() returns None after walker is done
891        assert!(walker.next().is_none());
892
893        // (∞, 3)
894        let mut walker = cursor.walk_range(..3).unwrap();
895        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
896        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
897        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
898        // next() returns None after walker is done
899        assert!(walker.next().is_none());
900
901        // (∞, ∞)
902        let mut walker = cursor.walk_range(..).unwrap();
903        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
904        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
905        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
906        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
907        // next() returns None after walker is done
908        assert!(walker.next().is_none());
909    }
910
911    #[test]
912    fn db_cursor_walk_range_on_dup_table() {
913        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
914
915        let address0 = Address::ZERO;
916        let address1 = Address::with_last_byte(1);
917        let address2 = Address::with_last_byte(2);
918
919        let tx = db.tx_mut().expect(ERROR_INIT_TX);
920        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
921            .expect(ERROR_PUT);
922        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
923            .expect(ERROR_PUT);
924        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
925            .expect(ERROR_PUT);
926        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
927            .expect(ERROR_PUT);
928        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
929            .expect(ERROR_PUT);
930        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
931            .expect(ERROR_PUT);
932        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
933            .expect(ERROR_PUT);
934        tx.commit().expect(ERROR_COMMIT);
935
936        let tx = db.tx().expect(ERROR_INIT_TX);
937        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
938
939        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
940        assert_eq!(entries.len(), 7);
941
942        let mut walker = cursor.walk_range(0..=1).unwrap();
943        assert_eq!(
944            walker.next().unwrap().unwrap(),
945            (0, AccountBeforeTx { address: address0, info: None })
946        );
947        assert_eq!(
948            walker.next().unwrap().unwrap(),
949            (0, AccountBeforeTx { address: address1, info: None })
950        );
951        assert_eq!(
952            walker.next().unwrap().unwrap(),
953            (0, AccountBeforeTx { address: address2, info: None })
954        );
955        assert_eq!(
956            walker.next().unwrap().unwrap(),
957            (1, AccountBeforeTx { address: address0, info: None })
958        );
959        assert_eq!(
960            walker.next().unwrap().unwrap(),
961            (1, AccountBeforeTx { address: address1, info: None })
962        );
963        assert_eq!(
964            walker.next().unwrap().unwrap(),
965            (1, AccountBeforeTx { address: address2, info: None })
966        );
967        assert!(walker.next().is_none());
968    }
969
970    #[expect(clippy::reversed_empty_ranges)]
971    #[test]
972    fn db_cursor_walk_range_invalid() {
973        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
974
975        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
976        let tx = db.tx_mut().expect(ERROR_INIT_TX);
977        vec![0, 1, 2, 3]
978            .into_iter()
979            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
980            .expect(ERROR_PUT);
981        tx.commit().expect(ERROR_COMMIT);
982
983        let tx = db.tx().expect(ERROR_INIT_TX);
984        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
985
986        // start bound greater than end bound
987        let mut res = cursor.walk_range(3..1).unwrap();
988        assert!(res.next().is_none());
989
990        // start bound greater than end bound
991        let mut res = cursor.walk_range(15..=2).unwrap();
992        assert!(res.next().is_none());
993
994        // returning nothing
995        let mut walker = cursor.walk_range(1..1).unwrap();
996        assert!(walker.next().is_none());
997    }
998
999    #[test]
1000    fn db_walker() {
1001        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1002
1003        // PUT (0, 0), (1, 0), (3, 0)
1004        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1005        vec![0, 1, 3]
1006            .into_iter()
1007            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1008            .expect(ERROR_PUT);
1009        tx.commit().expect(ERROR_COMMIT);
1010
1011        let tx = db.tx().expect(ERROR_INIT_TX);
1012        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1013
1014        let mut walker = Walker::new(&mut cursor, None);
1015
1016        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
1017        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
1018        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
1019        assert!(walker.next().is_none());
1020
1021        // transform to ReverseWalker
1022        let mut reverse_walker = walker.rev();
1023        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1024        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1025        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1026        assert!(reverse_walker.next().is_none());
1027    }
1028
1029    #[test]
1030    fn db_reverse_walker() {
1031        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1032
1033        // PUT (0, 0), (1, 0), (3, 0)
1034        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1035        vec![0, 1, 3]
1036            .into_iter()
1037            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1038            .expect(ERROR_PUT);
1039        tx.commit().expect(ERROR_COMMIT);
1040
1041        let tx = db.tx().expect(ERROR_INIT_TX);
1042        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1043
1044        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
1045
1046        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1047        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1048        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1049        assert!(reverse_walker.next().is_none());
1050
1051        // transform to Walker
1052        let mut walker = reverse_walker.forward();
1053        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
1054        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
1055        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
1056        assert!(walker.next().is_none());
1057    }
1058
1059    #[test]
1060    fn db_walk_back() {
1061        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1062
1063        // PUT (0, 0), (1, 0), (3, 0)
1064        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1065        vec![0, 1, 3]
1066            .into_iter()
1067            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1068            .expect(ERROR_PUT);
1069        tx.commit().expect(ERROR_COMMIT);
1070
1071        let tx = db.tx().expect(ERROR_INIT_TX);
1072        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1073
1074        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
1075        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1076        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1077        assert!(reverse_walker.next().is_none());
1078
1079        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
1080        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1081        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1082        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1083        assert!(reverse_walker.next().is_none());
1084
1085        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
1086        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1087        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1088        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1089        assert!(reverse_walker.next().is_none());
1090
1091        let mut reverse_walker = cursor.walk_back(None).unwrap();
1092        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1093        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1094        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1095        assert!(reverse_walker.next().is_none());
1096    }
1097
1098    #[test]
1099    fn db_cursor_seek_exact_or_previous_key() {
1100        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1101
1102        // PUT
1103        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1104        vec![0, 1, 3]
1105            .into_iter()
1106            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1107            .expect(ERROR_PUT);
1108        tx.commit().expect(ERROR_COMMIT);
1109
1110        // Cursor
1111        let missing_key = 2;
1112        let tx = db.tx().expect(ERROR_INIT_TX);
1113        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1114        assert!(cursor.current().unwrap().is_none());
1115
1116        // Seek exact
1117        let exact = cursor.seek_exact(missing_key).unwrap();
1118        assert_eq!(exact, None);
1119        assert!(cursor.current().unwrap().is_none());
1120    }
1121
1122    #[test]
1123    fn db_cursor_insert() {
1124        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1125
1126        // PUT
1127        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1128        vec![0, 1, 3, 4, 5]
1129            .into_iter()
1130            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1131            .expect(ERROR_PUT);
1132        tx.commit().expect(ERROR_COMMIT);
1133
1134        let key_to_insert = 2;
1135        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1136        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1137
1138        // INSERT
1139        assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
1140        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1141        // INSERT (failure)
1142        assert!(matches!(
1143        cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
1144        DatabaseError::Write(err) if *err == DatabaseWriteError {
1145            info: Error::KeyExist.into(),
1146            operation: DatabaseWriteOperation::CursorInsert,
1147            table_name: CanonicalHeaders::NAME,
1148            key: key_to_insert.encode().into(),
1149        }));
1150        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1151
1152        tx.commit().expect(ERROR_COMMIT);
1153
1154        // Confirm the result
1155        let tx = db.tx().expect(ERROR_INIT_TX);
1156        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1157        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1158        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1159        tx.commit().expect(ERROR_COMMIT);
1160    }
1161
1162    #[test]
1163    fn db_cursor_insert_dup() {
1164        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1165        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1166
1167        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1168        let key = Address::random();
1169        let subkey1 = B256::random();
1170        let subkey2 = B256::random();
1171
1172        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1173        assert!(dup_cursor.insert(key, &entry1).is_ok());
1174
1175        // Can't insert
1176        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1177        assert!(dup_cursor.insert(key, &entry2).is_err());
1178    }
1179
1180    #[test]
1181    fn db_cursor_delete_current_non_existent() {
1182        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1183        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1184
1185        let key1 = Address::with_last_byte(1);
1186        let key2 = Address::with_last_byte(2);
1187        let key3 = Address::with_last_byte(3);
1188        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1189
1190        assert!(cursor.insert(key1, &Account::default()).is_ok());
1191        assert!(cursor.insert(key2, &Account::default()).is_ok());
1192        assert!(cursor.insert(key3, &Account::default()).is_ok());
1193
1194        // Seek & delete key2
1195        cursor.seek_exact(key2).unwrap();
1196        assert!(cursor.delete_current().is_ok());
1197        assert!(cursor.seek_exact(key2).unwrap().is_none());
1198
1199        // Seek & delete key2 again
1200        assert!(cursor.seek_exact(key2).unwrap().is_none());
1201        assert!(matches!(
1202            cursor.delete_current().unwrap_err(),
1203            DatabaseError::Delete(err) if err == reth_libmdbx::Error::NoData.into()));
1204        // Assert that key1 is still there
1205        assert_eq!(cursor.seek_exact(key1).unwrap(), Some((key1, Account::default())));
1206        // Assert that key3 is still there
1207        assert_eq!(cursor.seek_exact(key3).unwrap(), Some((key3, Account::default())));
1208    }
1209
1210    #[test]
1211    fn db_cursor_insert_wherever_cursor_is() {
1212        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1213        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1214
1215        // PUT
1216        vec![0, 1, 3, 5, 7, 9]
1217            .into_iter()
1218            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1219            .expect(ERROR_PUT);
1220        tx.commit().expect(ERROR_COMMIT);
1221
1222        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1223        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1224
1225        // INSERT (cursor starts at last)
1226        cursor.last().unwrap();
1227        assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
1228
1229        for pos in (2..=8).step_by(2) {
1230            assert!(cursor.insert(pos, &B256::ZERO).is_ok());
1231            assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
1232        }
1233        tx.commit().expect(ERROR_COMMIT);
1234
1235        // Confirm the result
1236        let tx = db.tx().expect(ERROR_INIT_TX);
1237        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1238        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1239        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1240        tx.commit().expect(ERROR_COMMIT);
1241    }
1242
1243    #[test]
1244    fn db_cursor_append() {
1245        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1246
1247        // PUT
1248        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1249        vec![0, 1, 2, 3, 4]
1250            .into_iter()
1251            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1252            .expect(ERROR_PUT);
1253        tx.commit().expect(ERROR_COMMIT);
1254
1255        // APPEND
1256        let key_to_append = 5;
1257        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1258        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1259        assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
1260        tx.commit().expect(ERROR_COMMIT);
1261
1262        // Confirm the result
1263        let tx = db.tx().expect(ERROR_INIT_TX);
1264        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1265        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1266        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1267        tx.commit().expect(ERROR_COMMIT);
1268    }
1269
1270    #[test]
1271    fn db_cursor_append_failure() {
1272        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1273
1274        // PUT
1275        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1276        vec![0, 1, 3, 4, 5]
1277            .into_iter()
1278            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1279            .expect(ERROR_PUT);
1280        tx.commit().expect(ERROR_COMMIT);
1281
1282        // APPEND
1283        let key_to_append = 2;
1284        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1285        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1286        assert!(matches!(
1287        cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
1288        DatabaseError::Write(err) if *err == DatabaseWriteError {
1289            info: Error::KeyMismatch.into(),
1290            operation: DatabaseWriteOperation::CursorAppend,
1291            table_name: CanonicalHeaders::NAME,
1292            key: key_to_append.encode().into(),
1293        }));
1294        assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
1295        tx.commit().expect(ERROR_COMMIT);
1296
1297        // Confirm the result
1298        let tx = db.tx().expect(ERROR_INIT_TX);
1299        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1300        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1301        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1302        tx.commit().expect(ERROR_COMMIT);
1303    }
1304
1305    #[test]
1306    fn db_cursor_upsert() {
1307        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1308        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1309
1310        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1311        let key = Address::random();
1312
1313        let account = Account::default();
1314        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1315        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1316
1317        let account = Account { nonce: 1, ..Default::default() };
1318        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1319        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1320
1321        let account = Account { nonce: 2, ..Default::default() };
1322        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1323        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1324
1325        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1326        let subkey = B256::random();
1327
1328        let value = U256::from(1);
1329        let entry1 = StorageEntry { key: subkey, value };
1330        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1331        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1332
1333        let value = U256::from(2);
1334        let entry2 = StorageEntry { key: subkey, value };
1335        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1336        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1337        assert_eq!(dup_cursor.next_dup_val().unwrap(), Some(entry2));
1338    }
1339
1340    #[test]
1341    fn db_cursor_dupsort_append() {
1342        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1343
1344        let transition_id = 2;
1345
1346        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1347        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1348        vec![0, 1, 3, 4, 5]
1349            .into_iter()
1350            .try_for_each(|val| {
1351                cursor.append(
1352                    transition_id,
1353                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1354                )
1355            })
1356            .expect(ERROR_APPEND);
1357        tx.commit().expect(ERROR_COMMIT);
1358
1359        // APPEND DUP & APPEND
1360        let subkey_to_append = 2;
1361        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1362        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1363        assert!(matches!(
1364        cursor
1365            .append_dup(
1366                transition_id,
1367                AccountBeforeTx {
1368                    address: Address::with_last_byte(subkey_to_append),
1369                    info: None
1370                }
1371            )
1372            .unwrap_err(),
1373        DatabaseError::Write(err) if *err == DatabaseWriteError {
1374            info: Error::KeyMismatch.into(),
1375            operation: DatabaseWriteOperation::CursorAppendDup,
1376            table_name: AccountChangeSets::NAME,
1377            key: transition_id.encode().into(),
1378        }));
1379        assert!(matches!(
1380            cursor
1381                .append(
1382                    transition_id - 1,
1383                    &AccountBeforeTx {
1384                        address: Address::with_last_byte(subkey_to_append),
1385                        info: None
1386                    }
1387                )
1388                .unwrap_err(),
1389            DatabaseError::Write(err) if *err == DatabaseWriteError {
1390                info: Error::KeyMismatch.into(),
1391                operation: DatabaseWriteOperation::CursorAppend,
1392                table_name: AccountChangeSets::NAME,
1393                key: (transition_id - 1).encode().into(),
1394            }
1395        ));
1396        assert!(cursor
1397            .append(
1398                transition_id,
1399                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1400            )
1401            .is_ok());
1402    }
1403
1404    #[test]
1405    fn db_closure_put_get() {
1406        let tempdir = TempDir::new().expect(ERROR_TEMPDIR);
1407        let path = tempdir.path();
1408
1409        let value = Account {
1410            nonce: 18446744073709551615,
1411            bytecode_hash: Some(B256::random()),
1412            balance: U256::MAX,
1413        };
1414        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1415            .expect(ERROR_ETH_ADDRESS);
1416
1417        {
1418            let env = create_test_db_with_path(DatabaseEnvKind::RW, path);
1419
1420            // PUT
1421            let result = env.update(|tx| {
1422                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1423                200
1424            });
1425            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1426        }
1427
1428        let env = DatabaseEnv::open(
1429            path,
1430            DatabaseEnvKind::RO,
1431            DatabaseArguments::new(ClientVersion::default()),
1432        )
1433        .expect(ERROR_DB_CREATION);
1434
1435        // GET
1436        let result =
1437            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1438
1439        assert_eq!(result, Some(value))
1440    }
1441
1442    #[test]
1443    fn db_dup_sort() {
1444        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1445        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1446            .expect(ERROR_ETH_ADDRESS);
1447
1448        // PUT (0,0)
1449        let value00 = StorageEntry::default();
1450        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1451
1452        // PUT (2,2)
1453        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1454        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1455
1456        // PUT (1,1)
1457        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1458        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1459
1460        // Iterate with cursor
1461        {
1462            let tx = env.tx().expect(ERROR_INIT_TX);
1463            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1464
1465            // Notice that value11 and value22 have been ordered in the DB.
1466            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1467            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1468            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1469        }
1470
1471        // Seek value with exact subkey
1472        {
1473            let tx = env.tx().expect(ERROR_INIT_TX);
1474            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1475            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1476            assert_eq!(
1477                (key, value11),
1478                walker
1479                    .next()
1480                    .expect("element should exist.")
1481                    .expect("should be able to retrieve it.")
1482            );
1483        }
1484    }
1485
1486    #[test]
1487    fn db_walk_dup_with_not_existing_key() {
1488        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1489        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1490            .expect(ERROR_ETH_ADDRESS);
1491
1492        // PUT (0,0)
1493        let value00 = StorageEntry::default();
1494        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1495
1496        // PUT (2,2)
1497        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1498        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1499
1500        // PUT (1,1)
1501        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1502        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1503
1504        // Try to walk_dup with not existing key should immediately return None
1505        {
1506            let tx = env.tx().expect(ERROR_INIT_TX);
1507            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1508            let not_existing_key = Address::ZERO;
1509            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1510            assert!(walker.next().is_none());
1511        }
1512    }
1513
1514    #[test]
1515    fn db_iterate_over_all_dup_values() {
1516        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1517        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1518            .expect(ERROR_ETH_ADDRESS);
1519        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1520            .expect(ERROR_ETH_ADDRESS);
1521
1522        // PUT key1 (0,0)
1523        let value00 = StorageEntry::default();
1524        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1525
1526        // PUT key1 (1,1)
1527        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1528        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1529
1530        // PUT key2 (2,2)
1531        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1532        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1533
1534        // Iterate with walk_dup
1535        {
1536            let tx = env.tx().expect(ERROR_INIT_TX);
1537            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1538            let mut walker = cursor.walk_dup(None, None).unwrap();
1539
1540            // Notice that value11 and value22 have been ordered in the DB.
1541            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1542            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1543            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1544            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1545            assert!(walker.next().is_none());
1546        }
1547
1548        // Iterate by using `walk`
1549        {
1550            let tx = env.tx().expect(ERROR_INIT_TX);
1551            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1552            let first = cursor.first().unwrap().unwrap();
1553            let mut walker = cursor.walk(Some(first.0)).unwrap();
1554            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1555            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1556            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1557        }
1558    }
1559
1560    #[test]
1561    fn dup_value_with_same_subkey() {
1562        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1563        let key1 = Address::new([0x11; 20]);
1564        let key2 = Address::new([0x22; 20]);
1565
1566        // PUT key1 (0,1)
1567        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1568        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1569
1570        // PUT key1 (0,0)
1571        let value00 = StorageEntry::default();
1572        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1573
1574        // PUT key2 (2,2)
1575        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1576        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1577
1578        // Iterate with walk
1579        {
1580            let tx = env.tx().expect(ERROR_INIT_TX);
1581            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1582            let first = cursor.first().unwrap().unwrap();
1583            let mut walker = cursor.walk(Some(first.0)).unwrap();
1584
1585            // NOTE: Both values are present
1586            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1587            assert_eq!((key1, value01), walker.next().unwrap().unwrap());
1588            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1589        }
1590
1591        // seek_by_key_subkey
1592        {
1593            let tx = env.tx().expect(ERROR_INIT_TX);
1594            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1595
1596            // NOTE: There are two values with same SubKey but only first one is shown
1597            assert_eq!(value00, cursor.seek_by_key_subkey(key1, value00.key).unwrap().unwrap());
1598            // key1 but value is greater than the one in the DB
1599            assert_eq!(None, cursor.seek_by_key_subkey(key1, value22.key).unwrap());
1600        }
1601    }
1602
1603    #[test]
1604    fn db_sharded_key() {
1605        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1606        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1607
1608        let shards = 5;
1609        for i in 1..=shards {
1610            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1611            let list = IntegerList::new_pre_sorted([i * 100u64]);
1612
1613            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1614                .unwrap();
1615        }
1616
1617        // Seek value with non existing key.
1618        {
1619            let tx = db.tx().expect(ERROR_INIT_TX);
1620            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1621
1622            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1623            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1624            // `Address | 200`.
1625            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1626            let (key, list) = walker
1627                .next()
1628                .expect("element should exist.")
1629                .expect("should be able to retrieve it.");
1630
1631            assert_eq!(ShardedKey::new(real_key, 200), key);
1632            let list200 = IntegerList::new_pre_sorted([200u64]);
1633            assert_eq!(list200, list);
1634        }
1635        // Seek greatest index
1636        {
1637            let tx = db.tx().expect(ERROR_INIT_TX);
1638            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1639
1640            // It will seek the MAX value of transition index and try to use prev to get first
1641            // biggers.
1642            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1643            let (key, list) = cursor
1644                .prev()
1645                .expect("element should exist.")
1646                .expect("should be able to retrieve it.");
1647
1648            assert_eq!(ShardedKey::new(real_key, 400), key);
1649            let list400 = IntegerList::new_pre_sorted([400u64]);
1650            assert_eq!(list400, list);
1651        }
1652    }
1653}