Skip to main content

reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::{Path, PathBuf},
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106    /// Defines the synchronization strategy used by the MDBX database when writing data to disk.
107    ///
108    /// This determines how aggressively MDBX ensures data durability versus prioritizing
109    /// performance. The available modes are:
110    ///
111    /// - [`SyncMode::Durable`]: Ensures all transactions are fully flushed to disk before they are
112    ///   considered committed.   This provides the highest level of durability and crash safety
113    ///   but may have a performance cost.
114    /// - [`SyncMode::SafeNoSync`]: Skips certain fsync operations to improve write performance.
115    ///   This mode still maintains database integrity but may lose the most recent transactions if
116    ///   the system crashes unexpectedly.
117    ///
118    /// Choose `Durable` if consistency and crash safety are critical (e.g., production
119    /// environments). Choose `SafeNoSync` if performance is more important and occasional data
120    /// loss is acceptable (e.g., testing or ephemeral data).
121    sync_mode: SyncMode,
122}
123
124impl Default for DatabaseArguments {
125    fn default() -> Self {
126        Self::new(ClientVersion::default())
127    }
128}
129
130impl DatabaseArguments {
131    /// Create new database arguments with given client version.
132    pub fn new(client_version: ClientVersion) -> Self {
133        Self {
134            client_version,
135            geometry: Geometry {
136                size: Some(0..(8 * TERABYTE)),
137                growth_step: Some(4 * GIGABYTE as isize),
138                shrink_threshold: Some(0),
139                page_size: Some(PageSize::Set(default_page_size())),
140            },
141            log_level: None,
142            max_read_transaction_duration: None,
143            exclusive: None,
144            max_readers: None,
145            sync_mode: SyncMode::Durable,
146        }
147    }
148
149    /// Create database arguments suitable for testing.
150    ///
151    /// Uses a small geometry (64MB max, 4MB growth) to avoid exhausting the system's
152    /// virtual memory map limit (`vm.max_map_count`) when many test databases are open
153    /// concurrently.
154    pub fn test() -> Self {
155        Self {
156            geometry: Geometry {
157                size: Some(0..(64 * MEGABYTE)),
158                growth_step: Some(4 * MEGABYTE as isize),
159                shrink_threshold: Some(0),
160                page_size: Some(PageSize::Set(default_page_size())),
161            },
162            max_read_transaction_duration: Some(MaxReadTransactionDuration::Unbounded),
163            ..Self::new(ClientVersion::default())
164        }
165    }
166
167    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
168    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
169        if let Some(max_size) = max_size {
170            self.geometry.size = Some(0..max_size);
171        }
172        self
173    }
174
175    /// Sets the database page size value.
176    pub const fn with_geometry_page_size(mut self, page_size: Option<usize>) -> Self {
177        if let Some(size) = page_size {
178            self.geometry.page_size = Some(reth_libmdbx::PageSize::Set(size));
179        }
180
181        self
182    }
183
184    /// Sets the database sync mode.
185    pub const fn with_sync_mode(mut self, sync_mode: Option<SyncMode>) -> Self {
186        if let Some(sync_mode) = sync_mode {
187            self.sync_mode = sync_mode;
188        }
189
190        self
191    }
192
193    /// Configures the database growth step in bytes.
194    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
195        if let Some(growth_step) = growth_step {
196            self.geometry.growth_step = Some(growth_step as isize);
197        }
198        self
199    }
200
201    /// Set the log level.
202    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
203        self.log_level = log_level;
204        self
205    }
206
207    /// Set the maximum duration of a read transaction.
208    pub const fn max_read_transaction_duration(
209        &mut self,
210        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
211    ) {
212        self.max_read_transaction_duration = max_read_transaction_duration;
213    }
214
215    /// Set the maximum duration of a read transaction.
216    pub const fn with_max_read_transaction_duration(
217        mut self,
218        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
219    ) -> Self {
220        self.max_read_transaction_duration(max_read_transaction_duration);
221        self
222    }
223
224    /// Set the mdbx exclusive flag.
225    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
226        self.exclusive = exclusive;
227        self
228    }
229
230    /// Set `max_readers` flag.
231    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
232        self.max_readers = max_readers;
233        self
234    }
235
236    /// Returns the client version if any.
237    pub const fn client_version(&self) -> &ClientVersion {
238        &self.client_version
239    }
240}
241
242/// Wrapper for the libmdbx environment: [Environment]
243#[derive(Debug, Clone)]
244pub struct DatabaseEnv {
245    /// Libmdbx-sys environment.
246    inner: Environment,
247    /// Path to the database directory.
248    path: PathBuf,
249    /// Opened DBIs for reuse.
250    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
251    /// More generally, do not dynamically create, re-open, or drop tables at
252    /// runtime. It's better to perform table creation and migration only once
253    /// at startup.
254    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
255    /// Cache for metric handles. If `None`, metrics are not recorded.
256    metrics: Option<Arc<DatabaseEnvMetrics>>,
257    /// Write lock for when dealing with a read-write environment.
258    _lock_file: Option<StorageLock>,
259}
260
261impl Database for DatabaseEnv {
262    type TX = tx::Tx<RO>;
263    type TXMut = tx::Tx<RW>;
264
265    fn tx(&self) -> Result<Self::TX, DatabaseError> {
266        Tx::new(
267            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
268            self.dbis.clone(),
269            self.metrics.clone(),
270        )
271        .map_err(|e| DatabaseError::InitTx(e.into()))
272    }
273
274    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
275        Tx::new(
276            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
277            self.dbis.clone(),
278            self.metrics.clone(),
279        )
280        .map_err(|e| DatabaseError::InitTx(e.into()))
281    }
282
283    fn path(&self) -> PathBuf {
284        self.path.clone()
285    }
286
287    fn oldest_reader_txnid(&self) -> Option<u64> {
288        let info = self.inner.info().ok()?;
289        let txnid = info.latter_reader_txnid();
290        if txnid == 0 {
291            None
292        } else {
293            Some(txnid)
294        }
295    }
296
297    fn last_txnid(&self) -> Option<u64> {
298        let info = self.inner.info().ok()?;
299        let txnid = info.last_txnid();
300        if txnid == 0 {
301            None
302        } else {
303            Some(txnid as u64)
304        }
305    }
306}
307
308impl DatabaseMetrics for DatabaseEnv {
309    fn report_metrics(&self) {
310        for (name, value, labels) in self.gauge_metrics() {
311            gauge!(name, labels).set(value);
312        }
313    }
314
315    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
316        let mut metrics = Vec::new();
317
318        let _ = self
319            .view(|tx| {
320                for table in Tables::ALL.iter().map(Tables::name) {
321                    let table_db =
322                        tx.inner().open_db(Some(table)).wrap_err("Could not open db.")?;
323
324                    let stats = tx
325                        .inner()
326                        .db_stat(table_db.dbi())
327                        .wrap_err(format!("Could not find table: {table}"))?;
328
329                    let page_size = stats.page_size() as usize;
330                    let leaf_pages = stats.leaf_pages();
331                    let branch_pages = stats.branch_pages();
332                    let overflow_pages = stats.overflow_pages();
333                    let num_pages = leaf_pages + branch_pages + overflow_pages;
334                    let table_size = page_size * num_pages;
335                    let entries = stats.entries();
336
337                    metrics.push((
338                        "db.table_size",
339                        table_size as f64,
340                        vec![Label::new("table", table)],
341                    ));
342                    metrics.push((
343                        "db.table_pages",
344                        leaf_pages as f64,
345                        vec![Label::new("table", table), Label::new("type", "leaf")],
346                    ));
347                    metrics.push((
348                        "db.table_pages",
349                        branch_pages as f64,
350                        vec![Label::new("table", table), Label::new("type", "branch")],
351                    ));
352                    metrics.push((
353                        "db.table_pages",
354                        overflow_pages as f64,
355                        vec![Label::new("table", table), Label::new("type", "overflow")],
356                    ));
357                    metrics.push((
358                        "db.table_entries",
359                        entries as f64,
360                        vec![Label::new("table", table)],
361                    ));
362                }
363
364                Ok::<(), eyre::Report>(())
365            })
366            .map_err(|error| error!(%error, "Failed to read db table stats"));
367
368        if let Ok(freelist) =
369            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
370        {
371            metrics.push(("db.freelist", freelist as f64, vec![]));
372        }
373
374        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
375            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
376        }
377
378        metrics.push((
379            "db.timed_out_not_aborted_transactions",
380            self.timed_out_not_aborted_transactions() as f64,
381            vec![],
382        ));
383
384        metrics
385    }
386}
387
388impl DatabaseEnv {
389    /// Opens the database at the specified path with the given `EnvKind`.
390    ///
391    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
392    pub fn open(
393        path: &Path,
394        kind: DatabaseEnvKind,
395        args: DatabaseArguments,
396    ) -> Result<Self, DatabaseError> {
397        let _lock_file = if kind.is_rw() {
398            StorageLock::try_acquire(path)
399                .map_err(|err| DatabaseError::Other(err.to_string()))?
400                .into()
401        } else {
402            None
403        };
404
405        let mut inner_env = Environment::builder();
406
407        let mode = match kind {
408            DatabaseEnvKind::RO => Mode::ReadOnly,
409            DatabaseEnvKind::RW => {
410                // enable writemap mode in RW mode
411                inner_env.write_map();
412                Mode::ReadWrite { sync_mode: args.sync_mode }
413            }
414        };
415
416        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
417        // environment creation.
418        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
419        inner_env.set_max_dbs(256);
420        inner_env.set_geometry(args.geometry);
421
422        fn is_current_process(id: u32) -> bool {
423            #[cfg(unix)]
424            {
425                id == std::os::unix::process::parent_id() || id == std::process::id()
426            }
427
428            #[cfg(not(unix))]
429            {
430                id == std::process::id()
431            }
432        }
433
434        extern "C" fn handle_slow_readers(
435            _env: *const ffi::MDBX_env,
436            _txn: *const ffi::MDBX_txn,
437            process_id: ffi::mdbx_pid_t,
438            thread_id: ffi::mdbx_tid_t,
439            read_txn_id: u64,
440            gap: std::ffi::c_uint,
441            space: usize,
442            retry: std::ffi::c_int,
443        ) -> HandleSlowReadersReturnCode {
444            if space > MAX_SAFE_READER_SPACE {
445                let message = if is_current_process(process_id as u32) {
446                    "Current process has a long-lived database transaction that grows the database file."
447                } else {
448                    "External process has a long-lived database transaction that grows the database file. \
449                     Use shorter-lived read transactions or shut down the node."
450                };
451                reth_tracing::tracing::warn!(
452                    target: "storage::db::mdbx",
453                    ?process_id,
454                    ?thread_id,
455                    ?read_txn_id,
456                    ?gap,
457                    ?space,
458                    ?retry,
459                    "{message}"
460                )
461            }
462
463            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
464        }
465        inner_env.set_handle_slow_readers(handle_slow_readers);
466
467        inner_env.set_flags(EnvironmentFlags {
468            mode,
469            // We disable readahead because it improves performance for linear scans, but
470            // worsens it for random access (which is our access pattern outside of sync)
471            no_rdahead: true,
472            coalesce: true,
473            exclusive: args.exclusive.unwrap_or_default(),
474            ..Default::default()
475        });
476        // Configure more readers
477        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
478        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
479        // is "pages". Reclaimed list is the list of freed pages that's populated during the
480        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
481        // record with overflow pages. The flow is roughly the following:
482        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
483        //    sequence inside the DB file).
484        // 1. Get some pages from the freelist, put them into the reclaimed list.
485        // 2. Search through the reclaimed list for the sequence of size N.
486        // 3. a. If found, return the sequence.
487        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
488        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
489        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
490        //
491        // Basically, this parameter controls for how long do we search through the freelist before
492        // trying to allocate new pages. Smaller value will make MDBX to fallback to
493        // allocation faster, higher value will force MDBX to search through the freelist
494        // longer until the sequence of pages is found.
495        //
496        // The default value of this parameter is set depending on the DB size. The bigger the
497        // database, the larger is `rp augment limit`.
498        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
499        //
500        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
501        // because we want to prioritize freelist lookup speed over database growth.
502        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
503        inner_env.set_rp_augment_limit(256 * 1024);
504
505        if let Some(log_level) = args.log_level {
506            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
507            let is_log_level_available = if cfg!(debug_assertions) {
508                true
509            } else {
510                matches!(
511                    log_level,
512                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
513                )
514            };
515            if is_log_level_available {
516                inner_env.set_log_level(match log_level {
517                    LogLevel::Fatal => 0,
518                    LogLevel::Error => 1,
519                    LogLevel::Warn => 2,
520                    LogLevel::Notice => 3,
521                    LogLevel::Verbose => 4,
522                    LogLevel::Debug => 5,
523                    LogLevel::Trace => 6,
524                    LogLevel::Extra => 7,
525                });
526            } else {
527                return Err(DatabaseError::LogLevelUnavailable(log_level))
528            }
529        }
530
531        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
532            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
533        }
534
535        let env = Self {
536            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
537            path: path.to_path_buf(),
538            dbis: Arc::default(),
539            metrics: None,
540            _lock_file,
541        };
542
543        Ok(env)
544    }
545
546    /// Enables metrics on the database.
547    pub fn with_metrics(mut self) -> Self {
548        self.metrics = Some(DatabaseEnvMetrics::new().into());
549        self
550    }
551
552    /// Enables metrics on the database if requested.
553    pub fn with_metrics_if(self, enabled: bool) -> Self {
554        if enabled {
555            self.with_metrics()
556        } else {
557            self
558        }
559    }
560
561    /// Creates all the tables defined in [`Tables`], if necessary.
562    ///
563    /// This keeps tracks of the created table handles and stores them for better efficiency.
564    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
565        self.create_and_track_tables_for::<Tables>()
566    }
567
568    /// Creates all the tables defined in the given [`TableSet`], if necessary.
569    ///
570    /// This keeps tracks of the created table handles and stores them for better efficiency.
571    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
572        let handles = self._create_tables::<TS>()?;
573        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
574        // before it can be shared.
575        let dbis = Arc::make_mut(&mut self.dbis);
576        dbis.extend(handles);
577
578        Ok(())
579    }
580
581    /// Creates all the tables defined in [`Tables`], if necessary.
582    ///
583    /// If this type is unique the created handle for the tables will be updated.
584    ///
585    /// This is recommended to be called during initialization to create and track additional tables
586    /// after the default [`Self::create_tables`] are created.
587    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
588        let handles = self._create_tables::<TS>()?;
589        if let Some(db) = Arc::get_mut(self) {
590            // Note: The db is unique and the dbis as well, and they can also be cloned.
591            let dbis = Arc::make_mut(&mut db.dbis);
592            dbis.extend(handles);
593        }
594        Ok(())
595    }
596
597    /// Creates the tables and returns the identifiers of the tables.
598    fn _create_tables<TS: TableSet>(
599        &self,
600    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
601        let mut handles = Vec::new();
602        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
603
604        for table in TS::tables() {
605            let flags =
606                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
607
608            let db = tx
609                .create_db(Some(table.name()), flags)
610                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
611            handles.push((table.name(), db.dbi()));
612        }
613
614        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
615        Ok(handles)
616    }
617
618    /// Drops an orphaned table by name.
619    ///
620    /// This is used to clean up tables that are no longer defined in the schema but may still
621    /// exist on disk from previous versions.
622    ///
623    /// Returns `Ok(true)` if the table existed and was dropped, `Ok(false)` if the table was not
624    /// found.
625    ///
626    /// # Safety
627    /// This permanently deletes the table and all its data. Only use for tables that are
628    /// confirmed to be obsolete.
629    pub fn drop_orphan_table(&self, name: &str) -> Result<bool, DatabaseError> {
630        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
631
632        match tx.open_db(Some(name)) {
633            Ok(db) => {
634                // SAFETY: We just opened the db handle and will commit immediately after dropping.
635                // No other cursors or handles exist for this table.
636                unsafe {
637                    tx.drop_db(db.dbi()).map_err(|e| DatabaseError::Delete(e.into()))?;
638                }
639                tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
640                Ok(true)
641            }
642            Err(reth_libmdbx::Error::NotFound) => Ok(false),
643            Err(e) => Err(DatabaseError::Open(e.into())),
644        }
645    }
646
647    /// Records version that accesses the database with write privileges.
648    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
649        if version.is_empty() {
650            return Ok(())
651        }
652
653        let tx = self.tx_mut()?;
654        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
655
656        let last_version = version_cursor.last()?.map(|(_, v)| v);
657        if Some(&version) != last_version.as_ref() {
658            version_cursor.upsert(
659                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
660                &version,
661            )?;
662            tx.commit()?;
663        }
664
665        Ok(())
666    }
667}
668
669impl Deref for DatabaseEnv {
670    type Target = Environment;
671
672    fn deref(&self) -> &Self::Target {
673        &self.inner
674    }
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use crate::{
681        tables::{
682            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
683        },
684        test_utils::*,
685        AccountChangeSets,
686    };
687    use alloy_consensus::Header;
688    use alloy_primitives::{address, Address, B256, U256};
689    use reth_db_api::{
690        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
691        models::{AccountBeforeTx, IntegerList, ShardedKey},
692        table::{Encode, Table},
693    };
694    use reth_libmdbx::Error;
695    use reth_primitives_traits::{Account, StorageEntry};
696    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
697    use std::str::FromStr;
698    use tempfile::TempDir;
699
700    /// Create database for testing. Returns the `TempDir` to prevent cleanup until test ends.
701    fn create_test_db(kind: DatabaseEnvKind) -> (TempDir, DatabaseEnv) {
702        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
703        let env = create_test_db_with_path(kind, tempdir.path());
704        (tempdir, env)
705    }
706
707    /// Create database for testing with specified path
708    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
709        let mut env =
710            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
711                .expect(ERROR_DB_CREATION);
712        env.create_tables().expect(ERROR_TABLE_CREATION);
713        env
714    }
715
716    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
717    const ERROR_PUT: &str = "Not able to insert value into table.";
718    const ERROR_APPEND: &str = "Not able to append the value to the table.";
719    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
720    const ERROR_GET: &str = "Not able to get value from table.";
721    const ERROR_DEL: &str = "Not able to delete from table.";
722    const ERROR_COMMIT: &str = "Not able to commit transaction.";
723    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
724    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
725    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
726
727    #[test]
728    fn db_creation() {
729        let _tempdir = create_test_db(DatabaseEnvKind::RW);
730    }
731
732    #[test]
733    fn db_drop_orphan_table() {
734        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
735        let db = create_test_db_with_path(DatabaseEnvKind::RW, tempdir.path());
736
737        // Create an orphan table by manually creating it
738        let orphan_table_name = "OrphanTestTable";
739        {
740            let tx = db.inner.begin_rw_txn().expect(ERROR_INIT_TX);
741            tx.create_db(Some(orphan_table_name), DatabaseFlags::empty())
742                .expect("Failed to create orphan table");
743            tx.commit().expect(ERROR_COMMIT);
744        }
745
746        // Verify the table exists by opening it
747        {
748            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
749            assert!(tx.open_db(Some(orphan_table_name)).is_ok(), "Orphan table should exist");
750        }
751
752        // Drop the orphan table
753        let result = db.drop_orphan_table(orphan_table_name);
754        assert!(result.is_ok(), "drop_orphan_table should succeed");
755        assert!(result.unwrap(), "drop_orphan_table should return true for existing table");
756
757        // Verify the table no longer exists
758        {
759            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
760            assert!(
761                tx.open_db(Some(orphan_table_name)).is_err(),
762                "Orphan table should no longer exist"
763            );
764        }
765
766        // Dropping a non-existent table should return Ok(false)
767        let result = db.drop_orphan_table("NonExistentTable");
768        assert!(result.is_ok(), "drop_orphan_table should succeed for non-existent table");
769        assert!(!result.unwrap(), "drop_orphan_table should return false for non-existent table");
770    }
771
772    #[test]
773    fn db_manual_put_get() {
774        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
775
776        let value = Header::default();
777        let key = 1u64;
778
779        // PUT
780        let tx = env.tx_mut().expect(ERROR_INIT_TX);
781        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
782        tx.commit().expect(ERROR_COMMIT);
783
784        // GET
785        let tx = env.tx().expect(ERROR_INIT_TX);
786        let result = tx.get::<Headers>(key).expect(ERROR_GET);
787        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
788        tx.commit().expect(ERROR_COMMIT);
789    }
790
791    #[test]
792    fn db_dup_cursor_delete_first() {
793        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
794        let tx = db.tx_mut().expect(ERROR_INIT_TX);
795
796        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
797
798        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
799        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
800
801        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
802        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
803
804        assert_eq!(
805            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap(),
806            vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),]
807        );
808
809        let mut walker = dup_cursor.walk(None).unwrap();
810        walker.delete_current().expect(ERROR_DEL);
811
812        assert_eq!(walker.next().unwrap().unwrap(), (Address::with_last_byte(1), entry_1));
813
814        // Check the tx view - it correctly holds entry_1
815        assert_eq!(
816            tx.cursor_dup_read::<PlainStorageState>()
817                .unwrap()
818                .walk(None)
819                .unwrap()
820                .collect::<Result<Vec<_>, _>>()
821                .unwrap(),
822            vec![
823                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
824            ]
825        );
826
827        // Check the remainder of walker
828        assert!(walker.next().is_none());
829    }
830
831    #[test]
832    fn db_cursor_walk() {
833        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
834
835        let value = Header::default();
836        let key = 1u64;
837
838        // PUT
839        let tx = env.tx_mut().expect(ERROR_INIT_TX);
840        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
841        tx.commit().expect(ERROR_COMMIT);
842
843        // Cursor
844        let tx = env.tx().expect(ERROR_INIT_TX);
845        let mut cursor = tx.cursor_read::<Headers>().unwrap();
846
847        let first = cursor.first().unwrap();
848        assert!(first.is_some(), "First should be our put");
849
850        // Walk
851        let walk = cursor.walk(Some(key)).unwrap();
852        let first = walk.into_iter().next().unwrap().unwrap();
853        assert_eq!(first.1, value, "First next should be put value");
854    }
855
856    #[test]
857    fn db_cursor_walk_range() {
858        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
859
860        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
861        let tx = db.tx_mut().expect(ERROR_INIT_TX);
862        vec![0, 1, 2, 3]
863            .into_iter()
864            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
865            .expect(ERROR_PUT);
866        tx.commit().expect(ERROR_COMMIT);
867
868        let tx = db.tx().expect(ERROR_INIT_TX);
869        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
870
871        // [1, 3)
872        let mut walker = cursor.walk_range(1..3).unwrap();
873        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
874        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
875        assert!(walker.next().is_none());
876        // next() returns None after walker is done
877        assert!(walker.next().is_none());
878
879        // [1, 2]
880        let mut walker = cursor.walk_range(1..=2).unwrap();
881        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
882        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
883        // next() returns None after walker is done
884        assert!(walker.next().is_none());
885
886        // [1, ∞)
887        let mut walker = cursor.walk_range(1..).unwrap();
888        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
889        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
890        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
891        // next() returns None after walker is done
892        assert!(walker.next().is_none());
893
894        // [2, 4)
895        let mut walker = cursor.walk_range(2..4).unwrap();
896        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
897        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
898        assert!(walker.next().is_none());
899        // next() returns None after walker is done
900        assert!(walker.next().is_none());
901
902        // (∞, 3)
903        let mut walker = cursor.walk_range(..3).unwrap();
904        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
905        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
906        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
907        // next() returns None after walker is done
908        assert!(walker.next().is_none());
909
910        // (∞, ∞)
911        let mut walker = cursor.walk_range(..).unwrap();
912        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
913        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
914        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
915        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
916        // next() returns None after walker is done
917        assert!(walker.next().is_none());
918    }
919
920    #[test]
921    fn db_cursor_walk_range_on_dup_table() {
922        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
923
924        let address0 = Address::ZERO;
925        let address1 = Address::with_last_byte(1);
926        let address2 = Address::with_last_byte(2);
927
928        let tx = db.tx_mut().expect(ERROR_INIT_TX);
929        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
930            .expect(ERROR_PUT);
931        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
932            .expect(ERROR_PUT);
933        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
934            .expect(ERROR_PUT);
935        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
936            .expect(ERROR_PUT);
937        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
938            .expect(ERROR_PUT);
939        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
940            .expect(ERROR_PUT);
941        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
942            .expect(ERROR_PUT);
943        tx.commit().expect(ERROR_COMMIT);
944
945        let tx = db.tx().expect(ERROR_INIT_TX);
946        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
947
948        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
949        assert_eq!(entries.len(), 7);
950
951        let mut walker = cursor.walk_range(0..=1).unwrap();
952        assert_eq!(
953            walker.next().unwrap().unwrap(),
954            (0, AccountBeforeTx { address: address0, info: None })
955        );
956        assert_eq!(
957            walker.next().unwrap().unwrap(),
958            (0, AccountBeforeTx { address: address1, info: None })
959        );
960        assert_eq!(
961            walker.next().unwrap().unwrap(),
962            (0, AccountBeforeTx { address: address2, info: None })
963        );
964        assert_eq!(
965            walker.next().unwrap().unwrap(),
966            (1, AccountBeforeTx { address: address0, info: None })
967        );
968        assert_eq!(
969            walker.next().unwrap().unwrap(),
970            (1, AccountBeforeTx { address: address1, info: None })
971        );
972        assert_eq!(
973            walker.next().unwrap().unwrap(),
974            (1, AccountBeforeTx { address: address2, info: None })
975        );
976        assert!(walker.next().is_none());
977    }
978
979    #[expect(clippy::reversed_empty_ranges)]
980    #[test]
981    fn db_cursor_walk_range_invalid() {
982        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
983
984        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
985        let tx = db.tx_mut().expect(ERROR_INIT_TX);
986        vec![0, 1, 2, 3]
987            .into_iter()
988            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
989            .expect(ERROR_PUT);
990        tx.commit().expect(ERROR_COMMIT);
991
992        let tx = db.tx().expect(ERROR_INIT_TX);
993        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
994
995        // start bound greater than end bound
996        let mut res = cursor.walk_range(3..1).unwrap();
997        assert!(res.next().is_none());
998
999        // start bound greater than end bound
1000        let mut res = cursor.walk_range(15..=2).unwrap();
1001        assert!(res.next().is_none());
1002
1003        // returning nothing
1004        let mut walker = cursor.walk_range(1..1).unwrap();
1005        assert!(walker.next().is_none());
1006    }
1007
1008    #[test]
1009    fn db_walker() {
1010        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1011
1012        // PUT (0, 0), (1, 0), (3, 0)
1013        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1014        vec![0, 1, 3]
1015            .into_iter()
1016            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1017            .expect(ERROR_PUT);
1018        tx.commit().expect(ERROR_COMMIT);
1019
1020        let tx = db.tx().expect(ERROR_INIT_TX);
1021        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1022
1023        let mut walker = Walker::new(&mut cursor, None);
1024
1025        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
1026        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
1027        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
1028        assert!(walker.next().is_none());
1029
1030        // transform to ReverseWalker
1031        let mut reverse_walker = walker.rev();
1032        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1033        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1034        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1035        assert!(reverse_walker.next().is_none());
1036    }
1037
1038    #[test]
1039    fn db_reverse_walker() {
1040        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1041
1042        // PUT (0, 0), (1, 0), (3, 0)
1043        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1044        vec![0, 1, 3]
1045            .into_iter()
1046            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1047            .expect(ERROR_PUT);
1048        tx.commit().expect(ERROR_COMMIT);
1049
1050        let tx = db.tx().expect(ERROR_INIT_TX);
1051        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1052
1053        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
1054
1055        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1056        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1057        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1058        assert!(reverse_walker.next().is_none());
1059
1060        // transform to Walker
1061        let mut walker = reverse_walker.forward();
1062        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
1063        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
1064        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
1065        assert!(walker.next().is_none());
1066    }
1067
1068    #[test]
1069    fn db_walk_back() {
1070        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1071
1072        // PUT (0, 0), (1, 0), (3, 0)
1073        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1074        vec![0, 1, 3]
1075            .into_iter()
1076            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1077            .expect(ERROR_PUT);
1078        tx.commit().expect(ERROR_COMMIT);
1079
1080        let tx = db.tx().expect(ERROR_INIT_TX);
1081        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1082
1083        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
1084        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1085        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1086        assert!(reverse_walker.next().is_none());
1087
1088        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
1089        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1090        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1091        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1092        assert!(reverse_walker.next().is_none());
1093
1094        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
1095        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1096        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1097        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1098        assert!(reverse_walker.next().is_none());
1099
1100        let mut reverse_walker = cursor.walk_back(None).unwrap();
1101        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1102        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1103        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1104        assert!(reverse_walker.next().is_none());
1105    }
1106
1107    #[test]
1108    fn db_cursor_seek_exact_or_previous_key() {
1109        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1110
1111        // PUT
1112        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1113        vec![0, 1, 3]
1114            .into_iter()
1115            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1116            .expect(ERROR_PUT);
1117        tx.commit().expect(ERROR_COMMIT);
1118
1119        // Cursor
1120        let missing_key = 2;
1121        let tx = db.tx().expect(ERROR_INIT_TX);
1122        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1123        assert!(cursor.current().unwrap().is_none());
1124
1125        // Seek exact
1126        let exact = cursor.seek_exact(missing_key).unwrap();
1127        assert_eq!(exact, None);
1128        assert!(cursor.current().unwrap().is_none());
1129    }
1130
1131    #[test]
1132    fn db_cursor_insert() {
1133        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1134
1135        // PUT
1136        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1137        vec![0, 1, 3, 4, 5]
1138            .into_iter()
1139            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1140            .expect(ERROR_PUT);
1141        tx.commit().expect(ERROR_COMMIT);
1142
1143        let key_to_insert = 2;
1144        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1145        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1146
1147        // INSERT
1148        assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
1149        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1150        // INSERT (failure)
1151        assert!(matches!(
1152        cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
1153        DatabaseError::Write(err) if *err == DatabaseWriteError {
1154            info: Error::KeyExist.into(),
1155            operation: DatabaseWriteOperation::CursorInsert,
1156            table_name: CanonicalHeaders::NAME,
1157            key: key_to_insert.encode().into(),
1158        }));
1159        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1160
1161        tx.commit().expect(ERROR_COMMIT);
1162
1163        // Confirm the result
1164        let tx = db.tx().expect(ERROR_INIT_TX);
1165        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1166        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1167        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1168        tx.commit().expect(ERROR_COMMIT);
1169    }
1170
1171    #[test]
1172    fn db_cursor_insert_dup() {
1173        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1174        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1175
1176        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1177        let key = Address::random();
1178        let subkey1 = B256::random();
1179        let subkey2 = B256::random();
1180
1181        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1182        assert!(dup_cursor.insert(key, &entry1).is_ok());
1183
1184        // Can't insert
1185        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1186        assert!(dup_cursor.insert(key, &entry2).is_err());
1187    }
1188
1189    #[test]
1190    fn db_cursor_delete_current_non_existent() {
1191        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1192        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1193
1194        let key1 = Address::with_last_byte(1);
1195        let key2 = Address::with_last_byte(2);
1196        let key3 = Address::with_last_byte(3);
1197        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1198
1199        assert!(cursor.insert(key1, &Account::default()).is_ok());
1200        assert!(cursor.insert(key2, &Account::default()).is_ok());
1201        assert!(cursor.insert(key3, &Account::default()).is_ok());
1202
1203        // Seek & delete key2
1204        cursor.seek_exact(key2).unwrap();
1205        assert!(cursor.delete_current().is_ok());
1206        assert!(cursor.seek_exact(key2).unwrap().is_none());
1207
1208        // Seek & delete key2 again
1209        assert!(cursor.seek_exact(key2).unwrap().is_none());
1210        assert!(matches!(
1211            cursor.delete_current().unwrap_err(),
1212            DatabaseError::Delete(err) if err == reth_libmdbx::Error::NoData.into()));
1213        // Assert that key1 is still there
1214        assert_eq!(cursor.seek_exact(key1).unwrap(), Some((key1, Account::default())));
1215        // Assert that key3 is still there
1216        assert_eq!(cursor.seek_exact(key3).unwrap(), Some((key3, Account::default())));
1217    }
1218
1219    #[test]
1220    fn db_cursor_insert_wherever_cursor_is() {
1221        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1222        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1223
1224        // PUT
1225        vec![0, 1, 3, 5, 7, 9]
1226            .into_iter()
1227            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1228            .expect(ERROR_PUT);
1229        tx.commit().expect(ERROR_COMMIT);
1230
1231        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1232        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1233
1234        // INSERT (cursor starts at last)
1235        cursor.last().unwrap();
1236        assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
1237
1238        for pos in (2..=8).step_by(2) {
1239            assert!(cursor.insert(pos, &B256::ZERO).is_ok());
1240            assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
1241        }
1242        tx.commit().expect(ERROR_COMMIT);
1243
1244        // Confirm the result
1245        let tx = db.tx().expect(ERROR_INIT_TX);
1246        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1247        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1248        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1249        tx.commit().expect(ERROR_COMMIT);
1250    }
1251
1252    #[test]
1253    fn db_cursor_append() {
1254        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1255
1256        // PUT
1257        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1258        vec![0, 1, 2, 3, 4]
1259            .into_iter()
1260            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1261            .expect(ERROR_PUT);
1262        tx.commit().expect(ERROR_COMMIT);
1263
1264        // APPEND
1265        let key_to_append = 5;
1266        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1267        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1268        assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
1269        tx.commit().expect(ERROR_COMMIT);
1270
1271        // Confirm the result
1272        let tx = db.tx().expect(ERROR_INIT_TX);
1273        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1274        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1275        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1276        tx.commit().expect(ERROR_COMMIT);
1277    }
1278
1279    #[test]
1280    fn db_cursor_append_failure() {
1281        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1282
1283        // PUT
1284        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1285        vec![0, 1, 3, 4, 5]
1286            .into_iter()
1287            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1288            .expect(ERROR_PUT);
1289        tx.commit().expect(ERROR_COMMIT);
1290
1291        // APPEND
1292        let key_to_append = 2;
1293        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1294        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1295        assert!(matches!(
1296        cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
1297        DatabaseError::Write(err) if *err == DatabaseWriteError {
1298            info: Error::KeyMismatch.into(),
1299            operation: DatabaseWriteOperation::CursorAppend,
1300            table_name: CanonicalHeaders::NAME,
1301            key: key_to_append.encode().into(),
1302        }));
1303        assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
1304        tx.commit().expect(ERROR_COMMIT);
1305
1306        // Confirm the result
1307        let tx = db.tx().expect(ERROR_INIT_TX);
1308        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1309        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1310        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1311        tx.commit().expect(ERROR_COMMIT);
1312    }
1313
1314    #[test]
1315    fn db_cursor_upsert() {
1316        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1317        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1318
1319        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1320        let key = Address::random();
1321
1322        let account = Account::default();
1323        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1324        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1325
1326        let account = Account { nonce: 1, ..Default::default() };
1327        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1328        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1329
1330        let account = Account { nonce: 2, ..Default::default() };
1331        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1332        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1333
1334        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1335        let subkey = B256::random();
1336
1337        let value = U256::from(1);
1338        let entry1 = StorageEntry { key: subkey, value };
1339        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1340        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1341
1342        let value = U256::from(2);
1343        let entry2 = StorageEntry { key: subkey, value };
1344        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1345        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1346        assert_eq!(dup_cursor.next_dup_val().unwrap(), Some(entry2));
1347    }
1348
1349    #[test]
1350    fn db_cursor_dupsort_append() {
1351        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1352
1353        let transition_id = 2;
1354
1355        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1356        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1357        vec![0, 1, 3, 4, 5]
1358            .into_iter()
1359            .try_for_each(|val| {
1360                cursor.append(
1361                    transition_id,
1362                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1363                )
1364            })
1365            .expect(ERROR_APPEND);
1366        tx.commit().expect(ERROR_COMMIT);
1367
1368        // APPEND DUP & APPEND
1369        let subkey_to_append = 2;
1370        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1371        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1372        assert!(matches!(
1373        cursor
1374            .append_dup(
1375                transition_id,
1376                AccountBeforeTx {
1377                    address: Address::with_last_byte(subkey_to_append),
1378                    info: None
1379                }
1380            )
1381            .unwrap_err(),
1382        DatabaseError::Write(err) if *err == DatabaseWriteError {
1383            info: Error::KeyMismatch.into(),
1384            operation: DatabaseWriteOperation::CursorAppendDup,
1385            table_name: AccountChangeSets::NAME,
1386            key: transition_id.encode().into(),
1387        }));
1388        assert!(matches!(
1389            cursor
1390                .append(
1391                    transition_id - 1,
1392                    &AccountBeforeTx {
1393                        address: Address::with_last_byte(subkey_to_append),
1394                        info: None
1395                    }
1396                )
1397                .unwrap_err(),
1398            DatabaseError::Write(err) if *err == DatabaseWriteError {
1399                info: Error::KeyMismatch.into(),
1400                operation: DatabaseWriteOperation::CursorAppend,
1401                table_name: AccountChangeSets::NAME,
1402                key: (transition_id - 1).encode().into(),
1403            }
1404        ));
1405        assert!(cursor
1406            .append(
1407                transition_id,
1408                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1409            )
1410            .is_ok());
1411    }
1412
1413    #[test]
1414    fn db_closure_put_get() {
1415        let tempdir = TempDir::new().expect(ERROR_TEMPDIR);
1416        let path = tempdir.path();
1417
1418        let value = Account {
1419            nonce: 18446744073709551615,
1420            bytecode_hash: Some(B256::random()),
1421            balance: U256::MAX,
1422        };
1423        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1424            .expect(ERROR_ETH_ADDRESS);
1425
1426        {
1427            let env = create_test_db_with_path(DatabaseEnvKind::RW, path);
1428
1429            // PUT
1430            let result = env.update(|tx| {
1431                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1432                200
1433            });
1434            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1435        }
1436
1437        let env = DatabaseEnv::open(
1438            path,
1439            DatabaseEnvKind::RO,
1440            DatabaseArguments::new(ClientVersion::default()),
1441        )
1442        .expect(ERROR_DB_CREATION);
1443
1444        // GET
1445        let result =
1446            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1447
1448        assert_eq!(result, Some(value))
1449    }
1450
1451    #[test]
1452    fn db_dup_sort() {
1453        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1454        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1455            .expect(ERROR_ETH_ADDRESS);
1456
1457        // PUT (0,0)
1458        let value00 = StorageEntry::default();
1459        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1460
1461        // PUT (2,2)
1462        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1463        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1464
1465        // PUT (1,1)
1466        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1467        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1468
1469        // Iterate with cursor
1470        {
1471            let tx = env.tx().expect(ERROR_INIT_TX);
1472            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1473
1474            // Notice that value11 and value22 have been ordered in the DB.
1475            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1476            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1477            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1478        }
1479
1480        // Seek value with exact subkey
1481        {
1482            let tx = env.tx().expect(ERROR_INIT_TX);
1483            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1484            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1485            assert_eq!(
1486                (key, value11),
1487                walker
1488                    .next()
1489                    .expect("element should exist.")
1490                    .expect("should be able to retrieve it.")
1491            );
1492        }
1493    }
1494
1495    #[test]
1496    fn db_walk_dup_with_not_existing_key() {
1497        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1498        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1499            .expect(ERROR_ETH_ADDRESS);
1500
1501        // PUT (0,0)
1502        let value00 = StorageEntry::default();
1503        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1504
1505        // PUT (2,2)
1506        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1507        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1508
1509        // PUT (1,1)
1510        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1511        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1512
1513        // Try to walk_dup with not existing key should immediately return None
1514        {
1515            let tx = env.tx().expect(ERROR_INIT_TX);
1516            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1517            let not_existing_key = Address::ZERO;
1518            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1519            assert!(walker.next().is_none());
1520        }
1521    }
1522
1523    #[test]
1524    fn db_iterate_over_all_dup_values() {
1525        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1526        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1527            .expect(ERROR_ETH_ADDRESS);
1528        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1529            .expect(ERROR_ETH_ADDRESS);
1530
1531        // PUT key1 (0,0)
1532        let value00 = StorageEntry::default();
1533        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1534
1535        // PUT key1 (1,1)
1536        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1537        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1538
1539        // PUT key2 (2,2)
1540        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1541        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1542
1543        // Iterate with walk_dup
1544        {
1545            let tx = env.tx().expect(ERROR_INIT_TX);
1546            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1547            let mut walker = cursor.walk_dup(None, None).unwrap();
1548
1549            // Notice that value11 and value22 have been ordered in the DB.
1550            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1551            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1552            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1553            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1554            assert!(walker.next().is_none());
1555        }
1556
1557        // Iterate by using `walk`
1558        {
1559            let tx = env.tx().expect(ERROR_INIT_TX);
1560            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1561            let first = cursor.first().unwrap().unwrap();
1562            let mut walker = cursor.walk(Some(first.0)).unwrap();
1563            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1564            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1565            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1566        }
1567    }
1568
1569    #[test]
1570    fn dup_value_with_same_subkey() {
1571        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1572        let key1 = Address::new([0x11; 20]);
1573        let key2 = Address::new([0x22; 20]);
1574
1575        // PUT key1 (0,1)
1576        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1577        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1578
1579        // PUT key1 (0,0)
1580        let value00 = StorageEntry::default();
1581        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1582
1583        // PUT key2 (2,2)
1584        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1585        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1586
1587        // Iterate with walk
1588        {
1589            let tx = env.tx().expect(ERROR_INIT_TX);
1590            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1591            let first = cursor.first().unwrap().unwrap();
1592            let mut walker = cursor.walk(Some(first.0)).unwrap();
1593
1594            // NOTE: Both values are present
1595            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1596            assert_eq!((key1, value01), walker.next().unwrap().unwrap());
1597            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1598        }
1599
1600        // seek_by_key_subkey
1601        {
1602            let tx = env.tx().expect(ERROR_INIT_TX);
1603            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1604
1605            // NOTE: There are two values with same SubKey but only first one is shown
1606            assert_eq!(value00, cursor.seek_by_key_subkey(key1, value00.key).unwrap().unwrap());
1607            // key1 but value is greater than the one in the DB
1608            assert_eq!(None, cursor.seek_by_key_subkey(key1, value22.key).unwrap());
1609        }
1610    }
1611
1612    #[test]
1613    fn db_sharded_key() {
1614        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1615        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1616
1617        let shards = 5;
1618        for i in 1..=shards {
1619            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1620            let list = IntegerList::new_pre_sorted([i * 100u64]);
1621
1622            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1623                .unwrap();
1624        }
1625
1626        // Seek value with non existing key.
1627        {
1628            let tx = db.tx().expect(ERROR_INIT_TX);
1629            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1630
1631            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1632            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1633            // `Address | 200`.
1634            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1635            let (key, list) = walker
1636                .next()
1637                .expect("element should exist.")
1638                .expect("should be able to retrieve it.");
1639
1640            assert_eq!(ShardedKey::new(real_key, 200), key);
1641            let list200 = IntegerList::new_pre_sorted([200u64]);
1642            assert_eq!(list200, list);
1643        }
1644        // Seek greatest index
1645        {
1646            let tx = db.tx().expect(ERROR_INIT_TX);
1647            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1648
1649            // It will seek the MAX value of transition index and try to use prev to get first
1650            // biggers.
1651            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1652            let (key, list) = cursor
1653                .prev()
1654                .expect("element should exist.")
1655                .expect("should be able to retrieve it.");
1656
1657            assert_eq!(ShardedKey::new(real_key, 400), key);
1658            let list400 = IntegerList::new_pre_sorted([400u64]);
1659            assert_eq!(list400, list);
1660        }
1661    }
1662}