Skip to main content

reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::Path,
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106    /// Defines the synchronization strategy used by the MDBX database when writing data to disk.
107    ///
108    /// This determines how aggressively MDBX ensures data durability versus prioritizing
109    /// performance. The available modes are:
110    ///
111    /// - [`SyncMode::Durable`]: Ensures all transactions are fully flushed to disk before they are
112    ///   considered committed.   This provides the highest level of durability and crash safety
113    ///   but may have a performance cost.
114    /// - [`SyncMode::SafeNoSync`]: Skips certain fsync operations to improve write performance.
115    ///   This mode still maintains database integrity but may lose the most recent transactions if
116    ///   the system crashes unexpectedly.
117    ///
118    /// Choose `Durable` if consistency and crash safety are critical (e.g., production
119    /// environments). Choose `SafeNoSync` if performance is more important and occasional data
120    /// loss is acceptable (e.g., testing or ephemeral data).
121    sync_mode: SyncMode,
122}
123
124impl Default for DatabaseArguments {
125    fn default() -> Self {
126        Self::new(ClientVersion::default())
127    }
128}
129
130impl DatabaseArguments {
131    /// Create new database arguments with given client version.
132    pub fn new(client_version: ClientVersion) -> Self {
133        Self {
134            client_version,
135            geometry: Geometry {
136                size: Some(0..(8 * TERABYTE)),
137                growth_step: Some(4 * GIGABYTE as isize),
138                shrink_threshold: Some(0),
139                page_size: Some(PageSize::Set(default_page_size())),
140            },
141            log_level: None,
142            max_read_transaction_duration: None,
143            exclusive: None,
144            max_readers: None,
145            sync_mode: SyncMode::Durable,
146        }
147    }
148
149    /// Create database arguments suitable for testing.
150    ///
151    /// Uses a small geometry (64MB max, 4MB growth) to avoid exhausting the system's
152    /// virtual memory map limit (`vm.max_map_count`) when many test databases are open
153    /// concurrently.
154    pub fn test() -> Self {
155        Self {
156            geometry: Geometry {
157                size: Some(0..(64 * MEGABYTE)),
158                growth_step: Some(4 * MEGABYTE as isize),
159                shrink_threshold: Some(0),
160                page_size: Some(PageSize::Set(default_page_size())),
161            },
162            max_read_transaction_duration: Some(MaxReadTransactionDuration::Unbounded),
163            ..Self::new(ClientVersion::default())
164        }
165    }
166
167    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
168    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
169        if let Some(max_size) = max_size {
170            self.geometry.size = Some(0..max_size);
171        }
172        self
173    }
174
175    /// Sets the database page size value.
176    pub const fn with_geometry_page_size(mut self, page_size: Option<usize>) -> Self {
177        if let Some(size) = page_size {
178            self.geometry.page_size = Some(reth_libmdbx::PageSize::Set(size));
179        }
180
181        self
182    }
183
184    /// Sets the database sync mode.
185    pub const fn with_sync_mode(mut self, sync_mode: Option<SyncMode>) -> Self {
186        if let Some(sync_mode) = sync_mode {
187            self.sync_mode = sync_mode;
188        }
189
190        self
191    }
192
193    /// Configures the database growth step in bytes.
194    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
195        if let Some(growth_step) = growth_step {
196            self.geometry.growth_step = Some(growth_step as isize);
197        }
198        self
199    }
200
201    /// Set the log level.
202    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
203        self.log_level = log_level;
204        self
205    }
206
207    /// Set the maximum duration of a read transaction.
208    pub const fn max_read_transaction_duration(
209        &mut self,
210        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
211    ) {
212        self.max_read_transaction_duration = max_read_transaction_duration;
213    }
214
215    /// Set the maximum duration of a read transaction.
216    pub const fn with_max_read_transaction_duration(
217        mut self,
218        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
219    ) -> Self {
220        self.max_read_transaction_duration(max_read_transaction_duration);
221        self
222    }
223
224    /// Set the mdbx exclusive flag.
225    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
226        self.exclusive = exclusive;
227        self
228    }
229
230    /// Set `max_readers` flag.
231    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
232        self.max_readers = max_readers;
233        self
234    }
235
236    /// Returns the client version if any.
237    pub const fn client_version(&self) -> &ClientVersion {
238        &self.client_version
239    }
240}
241
242/// Wrapper for the libmdbx environment: [Environment]
243#[derive(Debug, Clone)]
244pub struct DatabaseEnv {
245    /// Libmdbx-sys environment.
246    inner: Environment,
247    /// Opened DBIs for reuse.
248    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
249    /// More generally, do not dynamically create, re-open, or drop tables at
250    /// runtime. It's better to perform table creation and migration only once
251    /// at startup.
252    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
253    /// Cache for metric handles. If `None`, metrics are not recorded.
254    metrics: Option<Arc<DatabaseEnvMetrics>>,
255    /// Write lock for when dealing with a read-write environment.
256    _lock_file: Option<StorageLock>,
257}
258
259impl Database for DatabaseEnv {
260    type TX = tx::Tx<RO>;
261    type TXMut = tx::Tx<RW>;
262
263    fn tx(&self) -> Result<Self::TX, DatabaseError> {
264        Tx::new(
265            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
266            self.dbis.clone(),
267            self.metrics.clone(),
268        )
269        .map_err(|e| DatabaseError::InitTx(e.into()))
270    }
271
272    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
273        Tx::new(
274            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
275            self.dbis.clone(),
276            self.metrics.clone(),
277        )
278        .map_err(|e| DatabaseError::InitTx(e.into()))
279    }
280}
281
282impl DatabaseMetrics for DatabaseEnv {
283    fn report_metrics(&self) {
284        for (name, value, labels) in self.gauge_metrics() {
285            gauge!(name, labels).set(value);
286        }
287    }
288
289    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
290        let mut metrics = Vec::new();
291
292        let _ = self
293            .view(|tx| {
294                for table in Tables::ALL.iter().map(Tables::name) {
295                    let table_db =
296                        tx.inner().open_db(Some(table)).wrap_err("Could not open db.")?;
297
298                    let stats = tx
299                        .inner()
300                        .db_stat(table_db.dbi())
301                        .wrap_err(format!("Could not find table: {table}"))?;
302
303                    let page_size = stats.page_size() as usize;
304                    let leaf_pages = stats.leaf_pages();
305                    let branch_pages = stats.branch_pages();
306                    let overflow_pages = stats.overflow_pages();
307                    let num_pages = leaf_pages + branch_pages + overflow_pages;
308                    let table_size = page_size * num_pages;
309                    let entries = stats.entries();
310
311                    metrics.push((
312                        "db.table_size",
313                        table_size as f64,
314                        vec![Label::new("table", table)],
315                    ));
316                    metrics.push((
317                        "db.table_pages",
318                        leaf_pages as f64,
319                        vec![Label::new("table", table), Label::new("type", "leaf")],
320                    ));
321                    metrics.push((
322                        "db.table_pages",
323                        branch_pages as f64,
324                        vec![Label::new("table", table), Label::new("type", "branch")],
325                    ));
326                    metrics.push((
327                        "db.table_pages",
328                        overflow_pages as f64,
329                        vec![Label::new("table", table), Label::new("type", "overflow")],
330                    ));
331                    metrics.push((
332                        "db.table_entries",
333                        entries as f64,
334                        vec![Label::new("table", table)],
335                    ));
336                }
337
338                Ok::<(), eyre::Report>(())
339            })
340            .map_err(|error| error!(%error, "Failed to read db table stats"));
341
342        if let Ok(freelist) =
343            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
344        {
345            metrics.push(("db.freelist", freelist as f64, vec![]));
346        }
347
348        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
349            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
350        }
351
352        metrics.push((
353            "db.timed_out_not_aborted_transactions",
354            self.timed_out_not_aborted_transactions() as f64,
355            vec![],
356        ));
357
358        metrics
359    }
360}
361
362impl DatabaseEnv {
363    /// Opens the database at the specified path with the given `EnvKind`.
364    ///
365    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
366    pub fn open(
367        path: &Path,
368        kind: DatabaseEnvKind,
369        args: DatabaseArguments,
370    ) -> Result<Self, DatabaseError> {
371        let _lock_file = if kind.is_rw() {
372            StorageLock::try_acquire(path)
373                .map_err(|err| DatabaseError::Other(err.to_string()))?
374                .into()
375        } else {
376            None
377        };
378
379        let mut inner_env = Environment::builder();
380
381        let mode = match kind {
382            DatabaseEnvKind::RO => Mode::ReadOnly,
383            DatabaseEnvKind::RW => {
384                // enable writemap mode in RW mode
385                inner_env.write_map();
386                Mode::ReadWrite { sync_mode: args.sync_mode }
387            }
388        };
389
390        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
391        // environment creation.
392        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
393        inner_env.set_max_dbs(256);
394        inner_env.set_geometry(args.geometry);
395
396        fn is_current_process(id: u32) -> bool {
397            #[cfg(unix)]
398            {
399                id == std::os::unix::process::parent_id() || id == std::process::id()
400            }
401
402            #[cfg(not(unix))]
403            {
404                id == std::process::id()
405            }
406        }
407
408        extern "C" fn handle_slow_readers(
409            _env: *const ffi::MDBX_env,
410            _txn: *const ffi::MDBX_txn,
411            process_id: ffi::mdbx_pid_t,
412            thread_id: ffi::mdbx_tid_t,
413            read_txn_id: u64,
414            gap: std::ffi::c_uint,
415            space: usize,
416            retry: std::ffi::c_int,
417        ) -> HandleSlowReadersReturnCode {
418            if space > MAX_SAFE_READER_SPACE {
419                let message = if is_current_process(process_id as u32) {
420                    "Current process has a long-lived database transaction that grows the database file."
421                } else {
422                    "External process has a long-lived database transaction that grows the database file. \
423                     Use shorter-lived read transactions or shut down the node."
424                };
425                reth_tracing::tracing::warn!(
426                    target: "storage::db::mdbx",
427                    ?process_id,
428                    ?thread_id,
429                    ?read_txn_id,
430                    ?gap,
431                    ?space,
432                    ?retry,
433                    "{message}"
434                )
435            }
436
437            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
438        }
439        inner_env.set_handle_slow_readers(handle_slow_readers);
440
441        inner_env.set_flags(EnvironmentFlags {
442            mode,
443            // We disable readahead because it improves performance for linear scans, but
444            // worsens it for random access (which is our access pattern outside of sync)
445            no_rdahead: true,
446            coalesce: true,
447            exclusive: args.exclusive.unwrap_or_default(),
448            ..Default::default()
449        });
450        // Configure more readers
451        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
452        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
453        // is "pages". Reclaimed list is the list of freed pages that's populated during the
454        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
455        // record with overflow pages. The flow is roughly the following:
456        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
457        //    sequence inside the DB file).
458        // 1. Get some pages from the freelist, put them into the reclaimed list.
459        // 2. Search through the reclaimed list for the sequence of size N.
460        // 3. a. If found, return the sequence.
461        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
462        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
463        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
464        //
465        // Basically, this parameter controls for how long do we search through the freelist before
466        // trying to allocate new pages. Smaller value will make MDBX to fallback to
467        // allocation faster, higher value will force MDBX to search through the freelist
468        // longer until the sequence of pages is found.
469        //
470        // The default value of this parameter is set depending on the DB size. The bigger the
471        // database, the larger is `rp augment limit`.
472        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
473        //
474        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
475        // because we want to prioritize freelist lookup speed over database growth.
476        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
477        inner_env.set_rp_augment_limit(256 * 1024);
478
479        if let Some(log_level) = args.log_level {
480            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
481            let is_log_level_available = if cfg!(debug_assertions) {
482                true
483            } else {
484                matches!(
485                    log_level,
486                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
487                )
488            };
489            if is_log_level_available {
490                inner_env.set_log_level(match log_level {
491                    LogLevel::Fatal => 0,
492                    LogLevel::Error => 1,
493                    LogLevel::Warn => 2,
494                    LogLevel::Notice => 3,
495                    LogLevel::Verbose => 4,
496                    LogLevel::Debug => 5,
497                    LogLevel::Trace => 6,
498                    LogLevel::Extra => 7,
499                });
500            } else {
501                return Err(DatabaseError::LogLevelUnavailable(log_level))
502            }
503        }
504
505        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
506            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
507        }
508
509        let env = Self {
510            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
511            dbis: Arc::default(),
512            metrics: None,
513            _lock_file,
514        };
515
516        Ok(env)
517    }
518
519    /// Enables metrics on the database.
520    pub fn with_metrics(mut self) -> Self {
521        self.metrics = Some(DatabaseEnvMetrics::new().into());
522        self
523    }
524
525    /// Creates all the tables defined in [`Tables`], if necessary.
526    ///
527    /// This keeps tracks of the created table handles and stores them for better efficiency.
528    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
529        self.create_and_track_tables_for::<Tables>()
530    }
531
532    /// Creates all the tables defined in the given [`TableSet`], if necessary.
533    ///
534    /// This keeps tracks of the created table handles and stores them for better efficiency.
535    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
536        let handles = self._create_tables::<TS>()?;
537        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
538        // before it can be shared.
539        let dbis = Arc::make_mut(&mut self.dbis);
540        dbis.extend(handles);
541
542        Ok(())
543    }
544
545    /// Creates all the tables defined in [`Tables`], if necessary.
546    ///
547    /// If this type is unique the created handle for the tables will be updated.
548    ///
549    /// This is recommended to be called during initialization to create and track additional tables
550    /// after the default [`Self::create_tables`] are created.
551    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
552        let handles = self._create_tables::<TS>()?;
553        if let Some(db) = Arc::get_mut(self) {
554            // Note: The db is unique and the dbis as well, and they can also be cloned.
555            let dbis = Arc::make_mut(&mut db.dbis);
556            dbis.extend(handles);
557        }
558        Ok(())
559    }
560
561    /// Creates the tables and returns the identifiers of the tables.
562    fn _create_tables<TS: TableSet>(
563        &self,
564    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
565        let mut handles = Vec::new();
566        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
567
568        for table in TS::tables() {
569            let flags =
570                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
571
572            let db = tx
573                .create_db(Some(table.name()), flags)
574                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
575            handles.push((table.name(), db.dbi()));
576        }
577
578        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
579        Ok(handles)
580    }
581
582    /// Drops an orphaned table by name.
583    ///
584    /// This is used to clean up tables that are no longer defined in the schema but may still
585    /// exist on disk from previous versions.
586    ///
587    /// Returns `Ok(true)` if the table existed and was dropped, `Ok(false)` if the table was not
588    /// found.
589    ///
590    /// # Safety
591    /// This permanently deletes the table and all its data. Only use for tables that are
592    /// confirmed to be obsolete.
593    pub fn drop_orphan_table(&self, name: &str) -> Result<bool, DatabaseError> {
594        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
595
596        match tx.open_db(Some(name)) {
597            Ok(db) => {
598                // SAFETY: We just opened the db handle and will commit immediately after dropping.
599                // No other cursors or handles exist for this table.
600                unsafe {
601                    tx.drop_db(db.dbi()).map_err(|e| DatabaseError::Delete(e.into()))?;
602                }
603                tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
604                Ok(true)
605            }
606            Err(reth_libmdbx::Error::NotFound) => Ok(false),
607            Err(e) => Err(DatabaseError::Open(e.into())),
608        }
609    }
610
611    /// Records version that accesses the database with write privileges.
612    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
613        if version.is_empty() {
614            return Ok(())
615        }
616
617        let tx = self.tx_mut()?;
618        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
619
620        let last_version = version_cursor.last()?.map(|(_, v)| v);
621        if Some(&version) != last_version.as_ref() {
622            version_cursor.upsert(
623                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
624                &version,
625            )?;
626            tx.commit()?;
627        }
628
629        Ok(())
630    }
631}
632
633impl Deref for DatabaseEnv {
634    type Target = Environment;
635
636    fn deref(&self) -> &Self::Target {
637        &self.inner
638    }
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644    use crate::{
645        tables::{
646            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
647        },
648        test_utils::*,
649        AccountChangeSets,
650    };
651    use alloy_consensus::Header;
652    use alloy_primitives::{address, Address, B256, U256};
653    use reth_db_api::{
654        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
655        models::{AccountBeforeTx, IntegerList, ShardedKey},
656        table::{Encode, Table},
657    };
658    use reth_libmdbx::Error;
659    use reth_primitives_traits::{Account, StorageEntry};
660    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
661    use std::str::FromStr;
662    use tempfile::TempDir;
663
664    /// Create database for testing. Returns the `TempDir` to prevent cleanup until test ends.
665    fn create_test_db(kind: DatabaseEnvKind) -> (TempDir, DatabaseEnv) {
666        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
667        let env = create_test_db_with_path(kind, tempdir.path());
668        (tempdir, env)
669    }
670
671    /// Create database for testing with specified path
672    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
673        let mut env =
674            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
675                .expect(ERROR_DB_CREATION);
676        env.create_tables().expect(ERROR_TABLE_CREATION);
677        env
678    }
679
680    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
681    const ERROR_PUT: &str = "Not able to insert value into table.";
682    const ERROR_APPEND: &str = "Not able to append the value to the table.";
683    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
684    const ERROR_GET: &str = "Not able to get value from table.";
685    const ERROR_DEL: &str = "Not able to delete from table.";
686    const ERROR_COMMIT: &str = "Not able to commit transaction.";
687    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
688    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
689    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
690
691    #[test]
692    fn db_creation() {
693        let _tempdir = create_test_db(DatabaseEnvKind::RW);
694    }
695
696    #[test]
697    fn db_drop_orphan_table() {
698        let tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
699        let db = create_test_db_with_path(DatabaseEnvKind::RW, tempdir.path());
700
701        // Create an orphan table by manually creating it
702        let orphan_table_name = "OrphanTestTable";
703        {
704            let tx = db.inner.begin_rw_txn().expect(ERROR_INIT_TX);
705            tx.create_db(Some(orphan_table_name), DatabaseFlags::empty())
706                .expect("Failed to create orphan table");
707            tx.commit().expect(ERROR_COMMIT);
708        }
709
710        // Verify the table exists by opening it
711        {
712            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
713            assert!(tx.open_db(Some(orphan_table_name)).is_ok(), "Orphan table should exist");
714        }
715
716        // Drop the orphan table
717        let result = db.drop_orphan_table(orphan_table_name);
718        assert!(result.is_ok(), "drop_orphan_table should succeed");
719        assert!(result.unwrap(), "drop_orphan_table should return true for existing table");
720
721        // Verify the table no longer exists
722        {
723            let tx = db.inner.begin_ro_txn().expect(ERROR_INIT_TX);
724            assert!(
725                tx.open_db(Some(orphan_table_name)).is_err(),
726                "Orphan table should no longer exist"
727            );
728        }
729
730        // Dropping a non-existent table should return Ok(false)
731        let result = db.drop_orphan_table("NonExistentTable");
732        assert!(result.is_ok(), "drop_orphan_table should succeed for non-existent table");
733        assert!(!result.unwrap(), "drop_orphan_table should return false for non-existent table");
734    }
735
736    #[test]
737    fn db_manual_put_get() {
738        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
739
740        let value = Header::default();
741        let key = 1u64;
742
743        // PUT
744        let tx = env.tx_mut().expect(ERROR_INIT_TX);
745        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
746        tx.commit().expect(ERROR_COMMIT);
747
748        // GET
749        let tx = env.tx().expect(ERROR_INIT_TX);
750        let result = tx.get::<Headers>(key).expect(ERROR_GET);
751        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
752        tx.commit().expect(ERROR_COMMIT);
753    }
754
755    #[test]
756    fn db_dup_cursor_delete_first() {
757        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
758        let tx = db.tx_mut().expect(ERROR_INIT_TX);
759
760        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
761
762        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
763        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
764
765        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
766        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
767
768        assert_eq!(
769            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap(),
770            vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),]
771        );
772
773        let mut walker = dup_cursor.walk(None).unwrap();
774        walker.delete_current().expect(ERROR_DEL);
775
776        assert_eq!(walker.next().unwrap().unwrap(), (Address::with_last_byte(1), entry_1));
777
778        // Check the tx view - it correctly holds entry_1
779        assert_eq!(
780            tx.cursor_dup_read::<PlainStorageState>()
781                .unwrap()
782                .walk(None)
783                .unwrap()
784                .collect::<Result<Vec<_>, _>>()
785                .unwrap(),
786            vec![
787                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
788            ]
789        );
790
791        // Check the remainder of walker
792        assert!(walker.next().is_none());
793    }
794
795    #[test]
796    fn db_cursor_walk() {
797        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
798
799        let value = Header::default();
800        let key = 1u64;
801
802        // PUT
803        let tx = env.tx_mut().expect(ERROR_INIT_TX);
804        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
805        tx.commit().expect(ERROR_COMMIT);
806
807        // Cursor
808        let tx = env.tx().expect(ERROR_INIT_TX);
809        let mut cursor = tx.cursor_read::<Headers>().unwrap();
810
811        let first = cursor.first().unwrap();
812        assert!(first.is_some(), "First should be our put");
813
814        // Walk
815        let walk = cursor.walk(Some(key)).unwrap();
816        let first = walk.into_iter().next().unwrap().unwrap();
817        assert_eq!(first.1, value, "First next should be put value");
818    }
819
820    #[test]
821    fn db_cursor_walk_range() {
822        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
823
824        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
825        let tx = db.tx_mut().expect(ERROR_INIT_TX);
826        vec![0, 1, 2, 3]
827            .into_iter()
828            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
829            .expect(ERROR_PUT);
830        tx.commit().expect(ERROR_COMMIT);
831
832        let tx = db.tx().expect(ERROR_INIT_TX);
833        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
834
835        // [1, 3)
836        let mut walker = cursor.walk_range(1..3).unwrap();
837        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
838        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
839        assert!(walker.next().is_none());
840        // next() returns None after walker is done
841        assert!(walker.next().is_none());
842
843        // [1, 2]
844        let mut walker = cursor.walk_range(1..=2).unwrap();
845        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
846        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
847        // next() returns None after walker is done
848        assert!(walker.next().is_none());
849
850        // [1, ∞)
851        let mut walker = cursor.walk_range(1..).unwrap();
852        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
853        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
854        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
855        // next() returns None after walker is done
856        assert!(walker.next().is_none());
857
858        // [2, 4)
859        let mut walker = cursor.walk_range(2..4).unwrap();
860        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
861        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
862        assert!(walker.next().is_none());
863        // next() returns None after walker is done
864        assert!(walker.next().is_none());
865
866        // (∞, 3)
867        let mut walker = cursor.walk_range(..3).unwrap();
868        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
869        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
870        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
871        // next() returns None after walker is done
872        assert!(walker.next().is_none());
873
874        // (∞, ∞)
875        let mut walker = cursor.walk_range(..).unwrap();
876        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
877        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
878        assert_eq!(walker.next().unwrap().unwrap(), (2, B256::ZERO));
879        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
880        // next() returns None after walker is done
881        assert!(walker.next().is_none());
882    }
883
884    #[test]
885    fn db_cursor_walk_range_on_dup_table() {
886        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
887
888        let address0 = Address::ZERO;
889        let address1 = Address::with_last_byte(1);
890        let address2 = Address::with_last_byte(2);
891
892        let tx = db.tx_mut().expect(ERROR_INIT_TX);
893        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
894            .expect(ERROR_PUT);
895        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
896            .expect(ERROR_PUT);
897        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
898            .expect(ERROR_PUT);
899        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
900            .expect(ERROR_PUT);
901        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
902            .expect(ERROR_PUT);
903        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
904            .expect(ERROR_PUT);
905        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
906            .expect(ERROR_PUT);
907        tx.commit().expect(ERROR_COMMIT);
908
909        let tx = db.tx().expect(ERROR_INIT_TX);
910        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
911
912        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
913        assert_eq!(entries.len(), 7);
914
915        let mut walker = cursor.walk_range(0..=1).unwrap();
916        assert_eq!(
917            walker.next().unwrap().unwrap(),
918            (0, AccountBeforeTx { address: address0, info: None })
919        );
920        assert_eq!(
921            walker.next().unwrap().unwrap(),
922            (0, AccountBeforeTx { address: address1, info: None })
923        );
924        assert_eq!(
925            walker.next().unwrap().unwrap(),
926            (0, AccountBeforeTx { address: address2, info: None })
927        );
928        assert_eq!(
929            walker.next().unwrap().unwrap(),
930            (1, AccountBeforeTx { address: address0, info: None })
931        );
932        assert_eq!(
933            walker.next().unwrap().unwrap(),
934            (1, AccountBeforeTx { address: address1, info: None })
935        );
936        assert_eq!(
937            walker.next().unwrap().unwrap(),
938            (1, AccountBeforeTx { address: address2, info: None })
939        );
940        assert!(walker.next().is_none());
941    }
942
943    #[expect(clippy::reversed_empty_ranges)]
944    #[test]
945    fn db_cursor_walk_range_invalid() {
946        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
947
948        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
949        let tx = db.tx_mut().expect(ERROR_INIT_TX);
950        vec![0, 1, 2, 3]
951            .into_iter()
952            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
953            .expect(ERROR_PUT);
954        tx.commit().expect(ERROR_COMMIT);
955
956        let tx = db.tx().expect(ERROR_INIT_TX);
957        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
958
959        // start bound greater than end bound
960        let mut res = cursor.walk_range(3..1).unwrap();
961        assert!(res.next().is_none());
962
963        // start bound greater than end bound
964        let mut res = cursor.walk_range(15..=2).unwrap();
965        assert!(res.next().is_none());
966
967        // returning nothing
968        let mut walker = cursor.walk_range(1..1).unwrap();
969        assert!(walker.next().is_none());
970    }
971
972    #[test]
973    fn db_walker() {
974        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
975
976        // PUT (0, 0), (1, 0), (3, 0)
977        let tx = db.tx_mut().expect(ERROR_INIT_TX);
978        vec![0, 1, 3]
979            .into_iter()
980            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
981            .expect(ERROR_PUT);
982        tx.commit().expect(ERROR_COMMIT);
983
984        let tx = db.tx().expect(ERROR_INIT_TX);
985        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
986
987        let mut walker = Walker::new(&mut cursor, None);
988
989        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
990        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
991        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
992        assert!(walker.next().is_none());
993
994        // transform to ReverseWalker
995        let mut reverse_walker = walker.rev();
996        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
997        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
998        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
999        assert!(reverse_walker.next().is_none());
1000    }
1001
1002    #[test]
1003    fn db_reverse_walker() {
1004        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1005
1006        // PUT (0, 0), (1, 0), (3, 0)
1007        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1008        vec![0, 1, 3]
1009            .into_iter()
1010            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1011            .expect(ERROR_PUT);
1012        tx.commit().expect(ERROR_COMMIT);
1013
1014        let tx = db.tx().expect(ERROR_INIT_TX);
1015        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1016
1017        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
1018
1019        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1020        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1021        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1022        assert!(reverse_walker.next().is_none());
1023
1024        // transform to Walker
1025        let mut walker = reverse_walker.forward();
1026        assert_eq!(walker.next().unwrap().unwrap(), (0, B256::ZERO));
1027        assert_eq!(walker.next().unwrap().unwrap(), (1, B256::ZERO));
1028        assert_eq!(walker.next().unwrap().unwrap(), (3, B256::ZERO));
1029        assert!(walker.next().is_none());
1030    }
1031
1032    #[test]
1033    fn db_walk_back() {
1034        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1035
1036        // PUT (0, 0), (1, 0), (3, 0)
1037        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1038        vec![0, 1, 3]
1039            .into_iter()
1040            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1041            .expect(ERROR_PUT);
1042        tx.commit().expect(ERROR_COMMIT);
1043
1044        let tx = db.tx().expect(ERROR_INIT_TX);
1045        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1046
1047        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
1048        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1049        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1050        assert!(reverse_walker.next().is_none());
1051
1052        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
1053        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1054        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1055        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1056        assert!(reverse_walker.next().is_none());
1057
1058        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
1059        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1060        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1061        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1062        assert!(reverse_walker.next().is_none());
1063
1064        let mut reverse_walker = cursor.walk_back(None).unwrap();
1065        assert_eq!(reverse_walker.next().unwrap().unwrap(), (3, B256::ZERO));
1066        assert_eq!(reverse_walker.next().unwrap().unwrap(), (1, B256::ZERO));
1067        assert_eq!(reverse_walker.next().unwrap().unwrap(), (0, B256::ZERO));
1068        assert!(reverse_walker.next().is_none());
1069    }
1070
1071    #[test]
1072    fn db_cursor_seek_exact_or_previous_key() {
1073        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1074
1075        // PUT
1076        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1077        vec![0, 1, 3]
1078            .into_iter()
1079            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1080            .expect(ERROR_PUT);
1081        tx.commit().expect(ERROR_COMMIT);
1082
1083        // Cursor
1084        let missing_key = 2;
1085        let tx = db.tx().expect(ERROR_INIT_TX);
1086        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1087        assert!(cursor.current().unwrap().is_none());
1088
1089        // Seek exact
1090        let exact = cursor.seek_exact(missing_key).unwrap();
1091        assert_eq!(exact, None);
1092        assert!(cursor.current().unwrap().is_none());
1093    }
1094
1095    #[test]
1096    fn db_cursor_insert() {
1097        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1098
1099        // PUT
1100        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1101        vec![0, 1, 3, 4, 5]
1102            .into_iter()
1103            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1104            .expect(ERROR_PUT);
1105        tx.commit().expect(ERROR_COMMIT);
1106
1107        let key_to_insert = 2;
1108        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1109        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1110
1111        // INSERT
1112        assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
1113        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1114        // INSERT (failure)
1115        assert!(matches!(
1116        cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
1117        DatabaseError::Write(err) if *err == DatabaseWriteError {
1118            info: Error::KeyExist.into(),
1119            operation: DatabaseWriteOperation::CursorInsert,
1120            table_name: CanonicalHeaders::NAME,
1121            key: key_to_insert.encode().into(),
1122        }));
1123        assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
1124
1125        tx.commit().expect(ERROR_COMMIT);
1126
1127        // Confirm the result
1128        let tx = db.tx().expect(ERROR_INIT_TX);
1129        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1130        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1131        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1132        tx.commit().expect(ERROR_COMMIT);
1133    }
1134
1135    #[test]
1136    fn db_cursor_insert_dup() {
1137        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1138        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1139
1140        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1141        let key = Address::random();
1142        let subkey1 = B256::random();
1143        let subkey2 = B256::random();
1144
1145        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1146        assert!(dup_cursor.insert(key, &entry1).is_ok());
1147
1148        // Can't insert
1149        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1150        assert!(dup_cursor.insert(key, &entry2).is_err());
1151    }
1152
1153    #[test]
1154    fn db_cursor_delete_current_non_existent() {
1155        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1156        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1157
1158        let key1 = Address::with_last_byte(1);
1159        let key2 = Address::with_last_byte(2);
1160        let key3 = Address::with_last_byte(3);
1161        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1162
1163        assert!(cursor.insert(key1, &Account::default()).is_ok());
1164        assert!(cursor.insert(key2, &Account::default()).is_ok());
1165        assert!(cursor.insert(key3, &Account::default()).is_ok());
1166
1167        // Seek & delete key2
1168        cursor.seek_exact(key2).unwrap();
1169        assert!(cursor.delete_current().is_ok());
1170        assert!(cursor.seek_exact(key2).unwrap().is_none());
1171
1172        // Seek & delete key2 again
1173        assert!(cursor.seek_exact(key2).unwrap().is_none());
1174        assert!(matches!(
1175            cursor.delete_current().unwrap_err(),
1176            DatabaseError::Delete(err) if err == reth_libmdbx::Error::NoData.into()));
1177        // Assert that key1 is still there
1178        assert_eq!(cursor.seek_exact(key1).unwrap(), Some((key1, Account::default())));
1179        // Assert that key3 is still there
1180        assert_eq!(cursor.seek_exact(key3).unwrap(), Some((key3, Account::default())));
1181    }
1182
1183    #[test]
1184    fn db_cursor_insert_wherever_cursor_is() {
1185        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1186        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1187
1188        // PUT
1189        vec![0, 1, 3, 5, 7, 9]
1190            .into_iter()
1191            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1192            .expect(ERROR_PUT);
1193        tx.commit().expect(ERROR_COMMIT);
1194
1195        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1196        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1197
1198        // INSERT (cursor starts at last)
1199        cursor.last().unwrap();
1200        assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
1201
1202        for pos in (2..=8).step_by(2) {
1203            assert!(cursor.insert(pos, &B256::ZERO).is_ok());
1204            assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
1205        }
1206        tx.commit().expect(ERROR_COMMIT);
1207
1208        // Confirm the result
1209        let tx = db.tx().expect(ERROR_INIT_TX);
1210        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1211        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1212        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1213        tx.commit().expect(ERROR_COMMIT);
1214    }
1215
1216    #[test]
1217    fn db_cursor_append() {
1218        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1219
1220        // PUT
1221        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1222        vec![0, 1, 2, 3, 4]
1223            .into_iter()
1224            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1225            .expect(ERROR_PUT);
1226        tx.commit().expect(ERROR_COMMIT);
1227
1228        // APPEND
1229        let key_to_append = 5;
1230        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1231        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1232        assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
1233        tx.commit().expect(ERROR_COMMIT);
1234
1235        // Confirm the result
1236        let tx = db.tx().expect(ERROR_INIT_TX);
1237        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1238        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1239        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1240        tx.commit().expect(ERROR_COMMIT);
1241    }
1242
1243    #[test]
1244    fn db_cursor_append_failure() {
1245        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1246
1247        // PUT
1248        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1249        vec![0, 1, 3, 4, 5]
1250            .into_iter()
1251            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1252            .expect(ERROR_PUT);
1253        tx.commit().expect(ERROR_COMMIT);
1254
1255        // APPEND
1256        let key_to_append = 2;
1257        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1258        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1259        assert!(matches!(
1260        cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
1261        DatabaseError::Write(err) if *err == DatabaseWriteError {
1262            info: Error::KeyMismatch.into(),
1263            operation: DatabaseWriteOperation::CursorAppend,
1264            table_name: CanonicalHeaders::NAME,
1265            key: key_to_append.encode().into(),
1266        }));
1267        assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
1268        tx.commit().expect(ERROR_COMMIT);
1269
1270        // Confirm the result
1271        let tx = db.tx().expect(ERROR_INIT_TX);
1272        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1273        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1274        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1275        tx.commit().expect(ERROR_COMMIT);
1276    }
1277
1278    #[test]
1279    fn db_cursor_upsert() {
1280        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1281        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1282
1283        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1284        let key = Address::random();
1285
1286        let account = Account::default();
1287        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1288        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1289
1290        let account = Account { nonce: 1, ..Default::default() };
1291        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1292        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1293
1294        let account = Account { nonce: 2, ..Default::default() };
1295        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1296        assert_eq!(cursor.seek_exact(key).unwrap(), Some((key, account)));
1297
1298        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1299        let subkey = B256::random();
1300
1301        let value = U256::from(1);
1302        let entry1 = StorageEntry { key: subkey, value };
1303        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1304        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1305
1306        let value = U256::from(2);
1307        let entry2 = StorageEntry { key: subkey, value };
1308        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1309        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey).unwrap(), Some(entry1));
1310        assert_eq!(dup_cursor.next_dup_val().unwrap(), Some(entry2));
1311    }
1312
1313    #[test]
1314    fn db_cursor_dupsort_append() {
1315        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1316
1317        let transition_id = 2;
1318
1319        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1320        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1321        vec![0, 1, 3, 4, 5]
1322            .into_iter()
1323            .try_for_each(|val| {
1324                cursor.append(
1325                    transition_id,
1326                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1327                )
1328            })
1329            .expect(ERROR_APPEND);
1330        tx.commit().expect(ERROR_COMMIT);
1331
1332        // APPEND DUP & APPEND
1333        let subkey_to_append = 2;
1334        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1335        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1336        assert!(matches!(
1337        cursor
1338            .append_dup(
1339                transition_id,
1340                AccountBeforeTx {
1341                    address: Address::with_last_byte(subkey_to_append),
1342                    info: None
1343                }
1344            )
1345            .unwrap_err(),
1346        DatabaseError::Write(err) if *err == DatabaseWriteError {
1347            info: Error::KeyMismatch.into(),
1348            operation: DatabaseWriteOperation::CursorAppendDup,
1349            table_name: AccountChangeSets::NAME,
1350            key: transition_id.encode().into(),
1351        }));
1352        assert!(matches!(
1353            cursor
1354                .append(
1355                    transition_id - 1,
1356                    &AccountBeforeTx {
1357                        address: Address::with_last_byte(subkey_to_append),
1358                        info: None
1359                    }
1360                )
1361                .unwrap_err(),
1362            DatabaseError::Write(err) if *err == DatabaseWriteError {
1363                info: Error::KeyMismatch.into(),
1364                operation: DatabaseWriteOperation::CursorAppend,
1365                table_name: AccountChangeSets::NAME,
1366                key: (transition_id - 1).encode().into(),
1367            }
1368        ));
1369        assert!(cursor
1370            .append(
1371                transition_id,
1372                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1373            )
1374            .is_ok());
1375    }
1376
1377    #[test]
1378    fn db_closure_put_get() {
1379        let tempdir = TempDir::new().expect(ERROR_TEMPDIR);
1380        let path = tempdir.path();
1381
1382        let value = Account {
1383            nonce: 18446744073709551615,
1384            bytecode_hash: Some(B256::random()),
1385            balance: U256::MAX,
1386        };
1387        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1388            .expect(ERROR_ETH_ADDRESS);
1389
1390        {
1391            let env = create_test_db_with_path(DatabaseEnvKind::RW, path);
1392
1393            // PUT
1394            let result = env.update(|tx| {
1395                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1396                200
1397            });
1398            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1399        }
1400
1401        let env = DatabaseEnv::open(
1402            path,
1403            DatabaseEnvKind::RO,
1404            DatabaseArguments::new(ClientVersion::default()),
1405        )
1406        .expect(ERROR_DB_CREATION);
1407
1408        // GET
1409        let result =
1410            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1411
1412        assert_eq!(result, Some(value))
1413    }
1414
1415    #[test]
1416    fn db_dup_sort() {
1417        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1418        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1419            .expect(ERROR_ETH_ADDRESS);
1420
1421        // PUT (0,0)
1422        let value00 = StorageEntry::default();
1423        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1424
1425        // PUT (2,2)
1426        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1427        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1428
1429        // PUT (1,1)
1430        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1431        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1432
1433        // Iterate with cursor
1434        {
1435            let tx = env.tx().expect(ERROR_INIT_TX);
1436            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1437
1438            // Notice that value11 and value22 have been ordered in the DB.
1439            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1440            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1441            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1442        }
1443
1444        // Seek value with exact subkey
1445        {
1446            let tx = env.tx().expect(ERROR_INIT_TX);
1447            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1448            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1449            assert_eq!(
1450                (key, value11),
1451                walker
1452                    .next()
1453                    .expect("element should exist.")
1454                    .expect("should be able to retrieve it.")
1455            );
1456        }
1457    }
1458
1459    #[test]
1460    fn db_walk_dup_with_not_existing_key() {
1461        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1462        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1463            .expect(ERROR_ETH_ADDRESS);
1464
1465        // PUT (0,0)
1466        let value00 = StorageEntry::default();
1467        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1468
1469        // PUT (2,2)
1470        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1471        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1472
1473        // PUT (1,1)
1474        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1475        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1476
1477        // Try to walk_dup with not existing key should immediately return None
1478        {
1479            let tx = env.tx().expect(ERROR_INIT_TX);
1480            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1481            let not_existing_key = Address::ZERO;
1482            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1483            assert!(walker.next().is_none());
1484        }
1485    }
1486
1487    #[test]
1488    fn db_iterate_over_all_dup_values() {
1489        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1490        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1491            .expect(ERROR_ETH_ADDRESS);
1492        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1493            .expect(ERROR_ETH_ADDRESS);
1494
1495        // PUT key1 (0,0)
1496        let value00 = StorageEntry::default();
1497        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1498
1499        // PUT key1 (1,1)
1500        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1501        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1502
1503        // PUT key2 (2,2)
1504        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1505        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1506
1507        // Iterate with walk_dup
1508        {
1509            let tx = env.tx().expect(ERROR_INIT_TX);
1510            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1511            let mut walker = cursor.walk_dup(None, None).unwrap();
1512
1513            // Notice that value11 and value22 have been ordered in the DB.
1514            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1515            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1516            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1517            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1518            assert!(walker.next().is_none());
1519        }
1520
1521        // Iterate by using `walk`
1522        {
1523            let tx = env.tx().expect(ERROR_INIT_TX);
1524            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1525            let first = cursor.first().unwrap().unwrap();
1526            let mut walker = cursor.walk(Some(first.0)).unwrap();
1527            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1528            assert_eq!((key1, value11), walker.next().unwrap().unwrap());
1529            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1530        }
1531    }
1532
1533    #[test]
1534    fn dup_value_with_same_subkey() {
1535        let (_tempdir, env) = create_test_db(DatabaseEnvKind::RW);
1536        let key1 = Address::new([0x11; 20]);
1537        let key2 = Address::new([0x22; 20]);
1538
1539        // PUT key1 (0,1)
1540        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1541        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1542
1543        // PUT key1 (0,0)
1544        let value00 = StorageEntry::default();
1545        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1546
1547        // PUT key2 (2,2)
1548        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1549        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1550
1551        // Iterate with walk
1552        {
1553            let tx = env.tx().expect(ERROR_INIT_TX);
1554            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1555            let first = cursor.first().unwrap().unwrap();
1556            let mut walker = cursor.walk(Some(first.0)).unwrap();
1557
1558            // NOTE: Both values are present
1559            assert_eq!((key1, value00), walker.next().unwrap().unwrap());
1560            assert_eq!((key1, value01), walker.next().unwrap().unwrap());
1561            assert_eq!((key2, value22), walker.next().unwrap().unwrap());
1562        }
1563
1564        // seek_by_key_subkey
1565        {
1566            let tx = env.tx().expect(ERROR_INIT_TX);
1567            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1568
1569            // NOTE: There are two values with same SubKey but only first one is shown
1570            assert_eq!(value00, cursor.seek_by_key_subkey(key1, value00.key).unwrap().unwrap());
1571            // key1 but value is greater than the one in the DB
1572            assert_eq!(None, cursor.seek_by_key_subkey(key1, value22.key).unwrap());
1573        }
1574    }
1575
1576    #[test]
1577    fn db_sharded_key() {
1578        let (_tempdir, db) = create_test_db(DatabaseEnvKind::RW);
1579        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1580
1581        let shards = 5;
1582        for i in 1..=shards {
1583            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1584            let list = IntegerList::new_pre_sorted([i * 100u64]);
1585
1586            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1587                .unwrap();
1588        }
1589
1590        // Seek value with non existing key.
1591        {
1592            let tx = db.tx().expect(ERROR_INIT_TX);
1593            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1594
1595            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1596            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1597            // `Address | 200`.
1598            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1599            let (key, list) = walker
1600                .next()
1601                .expect("element should exist.")
1602                .expect("should be able to retrieve it.");
1603
1604            assert_eq!(ShardedKey::new(real_key, 200), key);
1605            let list200 = IntegerList::new_pre_sorted([200u64]);
1606            assert_eq!(list200, list);
1607        }
1608        // Seek greatest index
1609        {
1610            let tx = db.tx().expect(ERROR_INIT_TX);
1611            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1612
1613            // It will seek the MAX value of transition index and try to use prev to get first
1614            // biggers.
1615            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1616            let (key, list) = cursor
1617                .prev()
1618                .expect("element should exist.")
1619                .expect("should be able to retrieve it.");
1620
1621            assert_eq!(ShardedKey::new(real_key, 400), key);
1622            let list400 = IntegerList::new_pre_sorted([400u64]);
1623            assert_eq!(list400, list);
1624        }
1625    }
1626}