reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::Path,
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106    /// Defines the synchronization strategy used by the MDBX database when writing data to disk.
107    ///
108    /// This determines how aggressively MDBX ensures data durability versus prioritizing
109    /// performance. The available modes are:
110    ///
111    /// - [`SyncMode::Durable`]: Ensures all transactions are fully flushed to disk before they are
112    ///   considered committed.   This provides the highest level of durability and crash safety
113    ///   but may have a performance cost.
114    /// - [`SyncMode::SafeNoSync`]: Skips certain fsync operations to improve write performance.
115    ///   This mode still maintains database integrity but may lose the most recent transactions if
116    ///   the system crashes unexpectedly.
117    ///
118    /// Choose `Durable` if consistency and crash safety are critical (e.g., production
119    /// environments). Choose `SafeNoSync` if performance is more important and occasional data
120    /// loss is acceptable (e.g., testing or ephemeral data).
121    sync_mode: SyncMode,
122}
123
124impl Default for DatabaseArguments {
125    fn default() -> Self {
126        Self::new(ClientVersion::default())
127    }
128}
129
130impl DatabaseArguments {
131    /// Create new database arguments with given client version.
132    pub fn new(client_version: ClientVersion) -> Self {
133        Self {
134            client_version,
135            geometry: Geometry {
136                size: Some(0..(8 * TERABYTE)),
137                growth_step: Some(4 * GIGABYTE as isize),
138                shrink_threshold: Some(0),
139                page_size: Some(PageSize::Set(default_page_size())),
140            },
141            log_level: None,
142            max_read_transaction_duration: None,
143            exclusive: None,
144            max_readers: None,
145            sync_mode: SyncMode::Durable,
146        }
147    }
148
149    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
150    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
151        if let Some(max_size) = max_size {
152            self.geometry.size = Some(0..max_size);
153        }
154        self
155    }
156
157    /// Sets the database page size value.
158    pub const fn with_geometry_page_size(mut self, page_size: Option<usize>) -> Self {
159        if let Some(size) = page_size {
160            self.geometry.page_size = Some(reth_libmdbx::PageSize::Set(size));
161        }
162
163        self
164    }
165
166    /// Sets the database sync mode.
167    pub const fn with_sync_mode(mut self, sync_mode: Option<SyncMode>) -> Self {
168        if let Some(sync_mode) = sync_mode {
169            self.sync_mode = sync_mode;
170        }
171
172        self
173    }
174
175    /// Configures the database growth step in bytes.
176    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
177        if let Some(growth_step) = growth_step {
178            self.geometry.growth_step = Some(growth_step as isize);
179        }
180        self
181    }
182
183    /// Set the log level.
184    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
185        self.log_level = log_level;
186        self
187    }
188
189    /// Set the maximum duration of a read transaction.
190    pub const fn max_read_transaction_duration(
191        &mut self,
192        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
193    ) {
194        self.max_read_transaction_duration = max_read_transaction_duration;
195    }
196
197    /// Set the maximum duration of a read transaction.
198    pub const fn with_max_read_transaction_duration(
199        mut self,
200        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
201    ) -> Self {
202        self.max_read_transaction_duration(max_read_transaction_duration);
203        self
204    }
205
206    /// Set the mdbx exclusive flag.
207    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
208        self.exclusive = exclusive;
209        self
210    }
211
212    /// Set `max_readers` flag.
213    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
214        self.max_readers = max_readers;
215        self
216    }
217
218    /// Returns the client version if any.
219    pub const fn client_version(&self) -> &ClientVersion {
220        &self.client_version
221    }
222}
223
224/// Wrapper for the libmdbx environment: [Environment]
225#[derive(Debug)]
226pub struct DatabaseEnv {
227    /// Libmdbx-sys environment.
228    inner: Environment,
229    /// Opened DBIs for reuse.
230    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
231    /// More generally, do not dynamically create, re-open, or drop tables at
232    /// runtime. It's better to perform table creation and migration only once
233    /// at startup.
234    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
235    /// Cache for metric handles. If `None`, metrics are not recorded.
236    metrics: Option<Arc<DatabaseEnvMetrics>>,
237    /// Write lock for when dealing with a read-write environment.
238    _lock_file: Option<StorageLock>,
239}
240
241impl Database for DatabaseEnv {
242    type TX = tx::Tx<RO>;
243    type TXMut = tx::Tx<RW>;
244
245    fn tx(&self) -> Result<Self::TX, DatabaseError> {
246        Tx::new(
247            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
248            self.dbis.clone(),
249            self.metrics.clone(),
250        )
251        .map_err(|e| DatabaseError::InitTx(e.into()))
252    }
253
254    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
255        Tx::new(
256            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
257            self.dbis.clone(),
258            self.metrics.clone(),
259        )
260        .map_err(|e| DatabaseError::InitTx(e.into()))
261    }
262}
263
264impl DatabaseMetrics for DatabaseEnv {
265    fn report_metrics(&self) {
266        for (name, value, labels) in self.gauge_metrics() {
267            gauge!(name, labels).set(value);
268        }
269    }
270
271    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
272        let mut metrics = Vec::new();
273
274        let _ = self
275            .view(|tx| {
276                for table in Tables::ALL.iter().map(Tables::name) {
277                    let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
278
279                    let stats = tx
280                        .inner
281                        .db_stat(&table_db)
282                        .wrap_err(format!("Could not find table: {table}"))?;
283
284                    let page_size = stats.page_size() as usize;
285                    let leaf_pages = stats.leaf_pages();
286                    let branch_pages = stats.branch_pages();
287                    let overflow_pages = stats.overflow_pages();
288                    let num_pages = leaf_pages + branch_pages + overflow_pages;
289                    let table_size = page_size * num_pages;
290                    let entries = stats.entries();
291
292                    metrics.push((
293                        "db.table_size",
294                        table_size as f64,
295                        vec![Label::new("table", table)],
296                    ));
297                    metrics.push((
298                        "db.table_pages",
299                        leaf_pages as f64,
300                        vec![Label::new("table", table), Label::new("type", "leaf")],
301                    ));
302                    metrics.push((
303                        "db.table_pages",
304                        branch_pages as f64,
305                        vec![Label::new("table", table), Label::new("type", "branch")],
306                    ));
307                    metrics.push((
308                        "db.table_pages",
309                        overflow_pages as f64,
310                        vec![Label::new("table", table), Label::new("type", "overflow")],
311                    ));
312                    metrics.push((
313                        "db.table_entries",
314                        entries as f64,
315                        vec![Label::new("table", table)],
316                    ));
317                }
318
319                Ok::<(), eyre::Report>(())
320            })
321            .map_err(|error| error!(%error, "Failed to read db table stats"));
322
323        if let Ok(freelist) =
324            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
325        {
326            metrics.push(("db.freelist", freelist as f64, vec![]));
327        }
328
329        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
330            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
331        }
332
333        metrics.push((
334            "db.timed_out_not_aborted_transactions",
335            self.timed_out_not_aborted_transactions() as f64,
336            vec![],
337        ));
338
339        metrics
340    }
341}
342
343impl DatabaseEnv {
344    /// Opens the database at the specified path with the given `EnvKind`.
345    ///
346    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
347    pub fn open(
348        path: &Path,
349        kind: DatabaseEnvKind,
350        args: DatabaseArguments,
351    ) -> Result<Self, DatabaseError> {
352        let _lock_file = if kind.is_rw() {
353            StorageLock::try_acquire(path)
354                .map_err(|err| DatabaseError::Other(err.to_string()))?
355                .into()
356        } else {
357            None
358        };
359
360        let mut inner_env = Environment::builder();
361
362        let mode = match kind {
363            DatabaseEnvKind::RO => Mode::ReadOnly,
364            DatabaseEnvKind::RW => {
365                // enable writemap mode in RW mode
366                inner_env.write_map();
367                Mode::ReadWrite { sync_mode: args.sync_mode }
368            }
369        };
370
371        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
372        // environment creation.
373        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
374        inner_env.set_max_dbs(256);
375        inner_env.set_geometry(args.geometry);
376
377        fn is_current_process(id: u32) -> bool {
378            #[cfg(unix)]
379            {
380                id == std::os::unix::process::parent_id() || id == std::process::id()
381            }
382
383            #[cfg(not(unix))]
384            {
385                id == std::process::id()
386            }
387        }
388
389        extern "C" fn handle_slow_readers(
390            _env: *const ffi::MDBX_env,
391            _txn: *const ffi::MDBX_txn,
392            process_id: ffi::mdbx_pid_t,
393            thread_id: ffi::mdbx_tid_t,
394            read_txn_id: u64,
395            gap: std::ffi::c_uint,
396            space: usize,
397            retry: std::ffi::c_int,
398        ) -> HandleSlowReadersReturnCode {
399            if space > MAX_SAFE_READER_SPACE {
400                let message = if is_current_process(process_id as u32) {
401                    "Current process has a long-lived database transaction that grows the database file."
402                } else {
403                    "External process has a long-lived database transaction that grows the database file. \
404                     Use shorter-lived read transactions or shut down the node."
405                };
406                reth_tracing::tracing::warn!(
407                    target: "storage::db::mdbx",
408                    ?process_id,
409                    ?thread_id,
410                    ?read_txn_id,
411                    ?gap,
412                    ?space,
413                    ?retry,
414                    "{message}"
415                )
416            }
417
418            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
419        }
420        inner_env.set_handle_slow_readers(handle_slow_readers);
421
422        inner_env.set_flags(EnvironmentFlags {
423            mode,
424            // We disable readahead because it improves performance for linear scans, but
425            // worsens it for random access (which is our access pattern outside of sync)
426            no_rdahead: true,
427            coalesce: true,
428            exclusive: args.exclusive.unwrap_or_default(),
429            ..Default::default()
430        });
431        // Configure more readers
432        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
433        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
434        // is "pages". Reclaimed list is the list of freed pages that's populated during the
435        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
436        // record with overflow pages. The flow is roughly the following:
437        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
438        //    sequence inside the DB file).
439        // 1. Get some pages from the freelist, put them into the reclaimed list.
440        // 2. Search through the reclaimed list for the sequence of size N.
441        // 3. a. If found, return the sequence.
442        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
443        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
444        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
445        //
446        // Basically, this parameter controls for how long do we search through the freelist before
447        // trying to allocate new pages. Smaller value will make MDBX to fallback to
448        // allocation faster, higher value will force MDBX to search through the freelist
449        // longer until the sequence of pages is found.
450        //
451        // The default value of this parameter is set depending on the DB size. The bigger the
452        // database, the larger is `rp augment limit`.
453        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
454        //
455        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
456        // because we want to prioritize freelist lookup speed over database growth.
457        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
458        inner_env.set_rp_augment_limit(256 * 1024);
459
460        if let Some(log_level) = args.log_level {
461            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
462            let is_log_level_available = if cfg!(debug_assertions) {
463                true
464            } else {
465                matches!(
466                    log_level,
467                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
468                )
469            };
470            if is_log_level_available {
471                inner_env.set_log_level(match log_level {
472                    LogLevel::Fatal => 0,
473                    LogLevel::Error => 1,
474                    LogLevel::Warn => 2,
475                    LogLevel::Notice => 3,
476                    LogLevel::Verbose => 4,
477                    LogLevel::Debug => 5,
478                    LogLevel::Trace => 6,
479                    LogLevel::Extra => 7,
480                });
481            } else {
482                return Err(DatabaseError::LogLevelUnavailable(log_level))
483            }
484        }
485
486        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
487            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
488        }
489
490        let env = Self {
491            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
492            dbis: Arc::default(),
493            metrics: None,
494            _lock_file,
495        };
496
497        Ok(env)
498    }
499
500    /// Enables metrics on the database.
501    pub fn with_metrics(mut self) -> Self {
502        self.metrics = Some(DatabaseEnvMetrics::new().into());
503        self
504    }
505
506    /// Creates all the tables defined in [`Tables`], if necessary.
507    ///
508    /// This keeps tracks of the created table handles and stores them for better efficiency.
509    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
510        self.create_and_track_tables_for::<Tables>()
511    }
512
513    /// Creates all the tables defined in the given [`TableSet`], if necessary.
514    ///
515    /// This keeps tracks of the created table handles and stores them for better efficiency.
516    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
517        let handles = self._create_tables::<TS>()?;
518        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
519        // before it can be shared.
520        let dbis = Arc::make_mut(&mut self.dbis);
521        dbis.extend(handles);
522
523        Ok(())
524    }
525
526    /// Creates all the tables defined in [`Tables`], if necessary.
527    ///
528    /// If this type is unique the created handle for the tables will be updated.
529    ///
530    /// This is recommended to be called during initialization to create and track additional tables
531    /// after the default [`Self::create_tables`] are created.
532    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
533        let handles = self._create_tables::<TS>()?;
534        if let Some(db) = Arc::get_mut(self) {
535            // Note: The db is unique and the dbis as well, and they can also be cloned.
536            let dbis = Arc::make_mut(&mut db.dbis);
537            dbis.extend(handles);
538        }
539        Ok(())
540    }
541
542    /// Creates the tables and returns the identifiers of the tables.
543    fn _create_tables<TS: TableSet>(
544        &self,
545    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
546        let mut handles = Vec::new();
547        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
548
549        for table in TS::tables() {
550            let flags =
551                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
552
553            let db = tx
554                .create_db(Some(table.name()), flags)
555                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
556            handles.push((table.name(), db.dbi()));
557        }
558
559        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
560        Ok(handles)
561    }
562
563    /// Records version that accesses the database with write privileges.
564    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
565        if version.is_empty() {
566            return Ok(())
567        }
568
569        let tx = self.tx_mut()?;
570        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
571
572        let last_version = version_cursor.last()?.map(|(_, v)| v);
573        if Some(&version) != last_version.as_ref() {
574            version_cursor.upsert(
575                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
576                &version,
577            )?;
578            tx.commit()?;
579        }
580
581        Ok(())
582    }
583}
584
585impl Deref for DatabaseEnv {
586    type Target = Environment;
587
588    fn deref(&self) -> &Self::Target {
589        &self.inner
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596    use crate::{
597        tables::{
598            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
599        },
600        test_utils::*,
601        AccountChangeSets,
602    };
603    use alloy_consensus::Header;
604    use alloy_primitives::{address, Address, B256, U256};
605    use reth_db_api::{
606        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
607        models::{AccountBeforeTx, IntegerList, ShardedKey},
608        table::{Encode, Table},
609    };
610    use reth_libmdbx::Error;
611    use reth_primitives_traits::{Account, StorageEntry};
612    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
613    use std::str::FromStr;
614    use tempfile::TempDir;
615
616    /// Create database for testing
617    fn create_test_db(kind: DatabaseEnvKind) -> Arc<DatabaseEnv> {
618        Arc::new(create_test_db_with_path(
619            kind,
620            &tempfile::TempDir::new().expect(ERROR_TEMPDIR).keep(),
621        ))
622    }
623
624    /// Create database for testing with specified path
625    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
626        let mut env =
627            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
628                .expect(ERROR_DB_CREATION);
629        env.create_tables().expect(ERROR_TABLE_CREATION);
630        env
631    }
632
633    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
634    const ERROR_PUT: &str = "Not able to insert value into table.";
635    const ERROR_APPEND: &str = "Not able to append the value to the table.";
636    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
637    const ERROR_GET: &str = "Not able to get value from table.";
638    const ERROR_DEL: &str = "Not able to delete from table.";
639    const ERROR_COMMIT: &str = "Not able to commit transaction.";
640    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
641    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
642    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
643
644    #[test]
645    fn db_creation() {
646        create_test_db(DatabaseEnvKind::RW);
647    }
648
649    #[test]
650    fn db_manual_put_get() {
651        let env = create_test_db(DatabaseEnvKind::RW);
652
653        let value = Header::default();
654        let key = 1u64;
655
656        // PUT
657        let tx = env.tx_mut().expect(ERROR_INIT_TX);
658        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
659        tx.commit().expect(ERROR_COMMIT);
660
661        // GET
662        let tx = env.tx().expect(ERROR_INIT_TX);
663        let result = tx.get::<Headers>(key).expect(ERROR_GET);
664        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
665        tx.commit().expect(ERROR_COMMIT);
666    }
667
668    #[test]
669    fn db_dup_cursor_delete_first() {
670        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
671        let tx = db.tx_mut().expect(ERROR_INIT_TX);
672
673        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
674
675        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
676        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
677
678        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
679        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
680
681        assert_eq!(
682            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>(),
683            Ok(vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),])
684        );
685
686        let mut walker = dup_cursor.walk(None).unwrap();
687        walker.delete_current().expect(ERROR_DEL);
688
689        assert_eq!(walker.next(), Some(Ok((Address::with_last_byte(1), entry_1))));
690
691        // Check the tx view - it correctly holds entry_1
692        assert_eq!(
693            tx.cursor_dup_read::<PlainStorageState>()
694                .unwrap()
695                .walk(None)
696                .unwrap()
697                .collect::<Result<Vec<_>, _>>(),
698            Ok(vec![
699                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
700            ])
701        );
702
703        // Check the remainder of walker
704        assert_eq!(walker.next(), None);
705    }
706
707    #[test]
708    fn db_cursor_walk() {
709        let env = create_test_db(DatabaseEnvKind::RW);
710
711        let value = Header::default();
712        let key = 1u64;
713
714        // PUT
715        let tx = env.tx_mut().expect(ERROR_INIT_TX);
716        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
717        tx.commit().expect(ERROR_COMMIT);
718
719        // Cursor
720        let tx = env.tx().expect(ERROR_INIT_TX);
721        let mut cursor = tx.cursor_read::<Headers>().unwrap();
722
723        let first = cursor.first().unwrap();
724        assert!(first.is_some(), "First should be our put");
725
726        // Walk
727        let walk = cursor.walk(Some(key)).unwrap();
728        let first = walk.into_iter().next().unwrap().unwrap();
729        assert_eq!(first.1, value, "First next should be put value");
730    }
731
732    #[test]
733    fn db_cursor_walk_range() {
734        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
735
736        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
737        let tx = db.tx_mut().expect(ERROR_INIT_TX);
738        vec![0, 1, 2, 3]
739            .into_iter()
740            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
741            .expect(ERROR_PUT);
742        tx.commit().expect(ERROR_COMMIT);
743
744        let tx = db.tx().expect(ERROR_INIT_TX);
745        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
746
747        // [1, 3)
748        let mut walker = cursor.walk_range(1..3).unwrap();
749        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
750        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
751        assert_eq!(walker.next(), None);
752        // next() returns None after walker is done
753        assert_eq!(walker.next(), None);
754
755        // [1, 2]
756        let mut walker = cursor.walk_range(1..=2).unwrap();
757        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
758        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
759        // next() returns None after walker is done
760        assert_eq!(walker.next(), None);
761
762        // [1, ∞)
763        let mut walker = cursor.walk_range(1..).unwrap();
764        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
765        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
766        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
767        // next() returns None after walker is done
768        assert_eq!(walker.next(), None);
769
770        // [2, 4)
771        let mut walker = cursor.walk_range(2..4).unwrap();
772        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
773        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
774        assert_eq!(walker.next(), None);
775        // next() returns None after walker is done
776        assert_eq!(walker.next(), None);
777
778        // (∞, 3)
779        let mut walker = cursor.walk_range(..3).unwrap();
780        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
781        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
782        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
783        // next() returns None after walker is done
784        assert_eq!(walker.next(), None);
785
786        // (∞, ∞)
787        let mut walker = cursor.walk_range(..).unwrap();
788        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
789        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
790        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
791        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
792        // next() returns None after walker is done
793        assert_eq!(walker.next(), None);
794    }
795
796    #[test]
797    fn db_cursor_walk_range_on_dup_table() {
798        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
799
800        let address0 = Address::ZERO;
801        let address1 = Address::with_last_byte(1);
802        let address2 = Address::with_last_byte(2);
803
804        let tx = db.tx_mut().expect(ERROR_INIT_TX);
805        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
806            .expect(ERROR_PUT);
807        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
808            .expect(ERROR_PUT);
809        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
810            .expect(ERROR_PUT);
811        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
812            .expect(ERROR_PUT);
813        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
814            .expect(ERROR_PUT);
815        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
816            .expect(ERROR_PUT);
817        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
818            .expect(ERROR_PUT);
819        tx.commit().expect(ERROR_COMMIT);
820
821        let tx = db.tx().expect(ERROR_INIT_TX);
822        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
823
824        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
825        assert_eq!(entries.len(), 7);
826
827        let mut walker = cursor.walk_range(0..=1).unwrap();
828        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address0, info: None }))));
829        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address1, info: None }))));
830        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address2, info: None }))));
831        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address0, info: None }))));
832        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address1, info: None }))));
833        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address2, info: None }))));
834        assert_eq!(walker.next(), None);
835    }
836
837    #[expect(clippy::reversed_empty_ranges)]
838    #[test]
839    fn db_cursor_walk_range_invalid() {
840        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
841
842        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
843        let tx = db.tx_mut().expect(ERROR_INIT_TX);
844        vec![0, 1, 2, 3]
845            .into_iter()
846            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
847            .expect(ERROR_PUT);
848        tx.commit().expect(ERROR_COMMIT);
849
850        let tx = db.tx().expect(ERROR_INIT_TX);
851        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
852
853        // start bound greater than end bound
854        let mut res = cursor.walk_range(3..1).unwrap();
855        assert_eq!(res.next(), None);
856
857        // start bound greater than end bound
858        let mut res = cursor.walk_range(15..=2).unwrap();
859        assert_eq!(res.next(), None);
860
861        // returning nothing
862        let mut walker = cursor.walk_range(1..1).unwrap();
863        assert_eq!(walker.next(), None);
864    }
865
866    #[test]
867    fn db_walker() {
868        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
869
870        // PUT (0, 0), (1, 0), (3, 0)
871        let tx = db.tx_mut().expect(ERROR_INIT_TX);
872        vec![0, 1, 3]
873            .into_iter()
874            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
875            .expect(ERROR_PUT);
876        tx.commit().expect(ERROR_COMMIT);
877
878        let tx = db.tx().expect(ERROR_INIT_TX);
879        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
880
881        let mut walker = Walker::new(&mut cursor, None);
882
883        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
884        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
885        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
886        assert_eq!(walker.next(), None);
887
888        // transform to ReverseWalker
889        let mut reverse_walker = walker.rev();
890        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
891        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
892        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
893        assert_eq!(reverse_walker.next(), None);
894    }
895
896    #[test]
897    fn db_reverse_walker() {
898        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
899
900        // PUT (0, 0), (1, 0), (3, 0)
901        let tx = db.tx_mut().expect(ERROR_INIT_TX);
902        vec![0, 1, 3]
903            .into_iter()
904            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
905            .expect(ERROR_PUT);
906        tx.commit().expect(ERROR_COMMIT);
907
908        let tx = db.tx().expect(ERROR_INIT_TX);
909        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
910
911        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
912
913        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
914        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
915        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
916        assert_eq!(reverse_walker.next(), None);
917
918        // transform to Walker
919        let mut walker = reverse_walker.forward();
920        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
921        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
922        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
923        assert_eq!(walker.next(), None);
924    }
925
926    #[test]
927    fn db_walk_back() {
928        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
929
930        // PUT (0, 0), (1, 0), (3, 0)
931        let tx = db.tx_mut().expect(ERROR_INIT_TX);
932        vec![0, 1, 3]
933            .into_iter()
934            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
935            .expect(ERROR_PUT);
936        tx.commit().expect(ERROR_COMMIT);
937
938        let tx = db.tx().expect(ERROR_INIT_TX);
939        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
940
941        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
942        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
943        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
944        assert_eq!(reverse_walker.next(), None);
945
946        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
947        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
948        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
949        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
950        assert_eq!(reverse_walker.next(), None);
951
952        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
953        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
954        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
955        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
956        assert_eq!(reverse_walker.next(), None);
957
958        let mut reverse_walker = cursor.walk_back(None).unwrap();
959        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
960        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
961        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
962        assert_eq!(reverse_walker.next(), None);
963    }
964
965    #[test]
966    fn db_cursor_seek_exact_or_previous_key() {
967        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
968
969        // PUT
970        let tx = db.tx_mut().expect(ERROR_INIT_TX);
971        vec![0, 1, 3]
972            .into_iter()
973            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
974            .expect(ERROR_PUT);
975        tx.commit().expect(ERROR_COMMIT);
976
977        // Cursor
978        let missing_key = 2;
979        let tx = db.tx().expect(ERROR_INIT_TX);
980        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
981        assert_eq!(cursor.current(), Ok(None));
982
983        // Seek exact
984        let exact = cursor.seek_exact(missing_key).unwrap();
985        assert_eq!(exact, None);
986        assert_eq!(cursor.current(), Ok(None));
987    }
988
989    #[test]
990    fn db_cursor_insert() {
991        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
992
993        // PUT
994        let tx = db.tx_mut().expect(ERROR_INIT_TX);
995        vec![0, 1, 3, 4, 5]
996            .into_iter()
997            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
998            .expect(ERROR_PUT);
999        tx.commit().expect(ERROR_COMMIT);
1000
1001        let key_to_insert = 2;
1002        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1003        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1004
1005        // INSERT
1006        assert_eq!(cursor.insert(key_to_insert, &B256::ZERO), Ok(()));
1007        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
1008
1009        // INSERT (failure)
1010        assert_eq!(
1011            cursor.insert(key_to_insert, &B256::ZERO),
1012            Err(DatabaseWriteError {
1013                info: Error::KeyExist.into(),
1014                operation: DatabaseWriteOperation::CursorInsert,
1015                table_name: CanonicalHeaders::NAME,
1016                key: key_to_insert.encode().into(),
1017            }
1018            .into())
1019        );
1020        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
1021
1022        tx.commit().expect(ERROR_COMMIT);
1023
1024        // Confirm the result
1025        let tx = db.tx().expect(ERROR_INIT_TX);
1026        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1027        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1028        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1029        tx.commit().expect(ERROR_COMMIT);
1030    }
1031
1032    #[test]
1033    fn db_cursor_insert_dup() {
1034        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1035        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1036
1037        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1038        let key = Address::random();
1039        let subkey1 = B256::random();
1040        let subkey2 = B256::random();
1041
1042        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1043        assert!(dup_cursor.insert(key, &entry1).is_ok());
1044
1045        // Can't insert
1046        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1047        assert!(dup_cursor.insert(key, &entry2).is_err());
1048    }
1049
1050    #[test]
1051    fn db_cursor_delete_current_non_existent() {
1052        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1053        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1054
1055        let key1 = Address::with_last_byte(1);
1056        let key2 = Address::with_last_byte(2);
1057        let key3 = Address::with_last_byte(3);
1058        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1059
1060        assert!(cursor.insert(key1, &Account::default()).is_ok());
1061        assert!(cursor.insert(key2, &Account::default()).is_ok());
1062        assert!(cursor.insert(key3, &Account::default()).is_ok());
1063
1064        // Seek & delete key2
1065        cursor.seek_exact(key2).unwrap();
1066        assert_eq!(cursor.delete_current(), Ok(()));
1067        assert_eq!(cursor.seek_exact(key2), Ok(None));
1068
1069        // Seek & delete key2 again
1070        assert_eq!(cursor.seek_exact(key2), Ok(None));
1071        assert_eq!(
1072            cursor.delete_current(),
1073            Err(DatabaseError::Delete(reth_libmdbx::Error::NoData.into()))
1074        );
1075        // Assert that key1 is still there
1076        assert_eq!(cursor.seek_exact(key1), Ok(Some((key1, Account::default()))));
1077        // Assert that key3 is still there
1078        assert_eq!(cursor.seek_exact(key3), Ok(Some((key3, Account::default()))));
1079    }
1080
1081    #[test]
1082    fn db_cursor_insert_wherever_cursor_is() {
1083        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1084        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1085
1086        // PUT
1087        vec![0, 1, 3, 5, 7, 9]
1088            .into_iter()
1089            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1090            .expect(ERROR_PUT);
1091        tx.commit().expect(ERROR_COMMIT);
1092
1093        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1094        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1095
1096        // INSERT (cursor starts at last)
1097        cursor.last().unwrap();
1098        assert_eq!(cursor.current(), Ok(Some((9, B256::ZERO))));
1099
1100        for pos in (2..=8).step_by(2) {
1101            assert_eq!(cursor.insert(pos, &B256::ZERO), Ok(()));
1102            assert_eq!(cursor.current(), Ok(Some((pos, B256::ZERO))));
1103        }
1104        tx.commit().expect(ERROR_COMMIT);
1105
1106        // Confirm the result
1107        let tx = db.tx().expect(ERROR_INIT_TX);
1108        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1109        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1110        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1111        tx.commit().expect(ERROR_COMMIT);
1112    }
1113
1114    #[test]
1115    fn db_cursor_append() {
1116        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1117
1118        // PUT
1119        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1120        vec![0, 1, 2, 3, 4]
1121            .into_iter()
1122            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1123            .expect(ERROR_PUT);
1124        tx.commit().expect(ERROR_COMMIT);
1125
1126        // APPEND
1127        let key_to_append = 5;
1128        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1129        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1130        assert_eq!(cursor.append(key_to_append, &B256::ZERO), Ok(()));
1131        tx.commit().expect(ERROR_COMMIT);
1132
1133        // Confirm the result
1134        let tx = db.tx().expect(ERROR_INIT_TX);
1135        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1136        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1137        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1138        tx.commit().expect(ERROR_COMMIT);
1139    }
1140
1141    #[test]
1142    fn db_cursor_append_failure() {
1143        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1144
1145        // PUT
1146        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1147        vec![0, 1, 3, 4, 5]
1148            .into_iter()
1149            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1150            .expect(ERROR_PUT);
1151        tx.commit().expect(ERROR_COMMIT);
1152
1153        // APPEND
1154        let key_to_append = 2;
1155        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1156        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1157        assert_eq!(
1158            cursor.append(key_to_append, &B256::ZERO),
1159            Err(DatabaseWriteError {
1160                info: Error::KeyMismatch.into(),
1161                operation: DatabaseWriteOperation::CursorAppend,
1162                table_name: CanonicalHeaders::NAME,
1163                key: key_to_append.encode().into(),
1164            }
1165            .into())
1166        );
1167        assert_eq!(cursor.current(), Ok(Some((5, B256::ZERO)))); // the end of table
1168        tx.commit().expect(ERROR_COMMIT);
1169
1170        // Confirm the result
1171        let tx = db.tx().expect(ERROR_INIT_TX);
1172        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1173        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1174        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1175        tx.commit().expect(ERROR_COMMIT);
1176    }
1177
1178    #[test]
1179    fn db_cursor_upsert() {
1180        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1181        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1182
1183        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1184        let key = Address::random();
1185
1186        let account = Account::default();
1187        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1188        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1189
1190        let account = Account { nonce: 1, ..Default::default() };
1191        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1192        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1193
1194        let account = Account { nonce: 2, ..Default::default() };
1195        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1196        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1197
1198        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1199        let subkey = B256::random();
1200
1201        let value = U256::from(1);
1202        let entry1 = StorageEntry { key: subkey, value };
1203        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1204        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1205
1206        let value = U256::from(2);
1207        let entry2 = StorageEntry { key: subkey, value };
1208        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1209        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1210        assert_eq!(dup_cursor.next_dup_val(), Ok(Some(entry2)));
1211    }
1212
1213    #[test]
1214    fn db_cursor_dupsort_append() {
1215        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1216
1217        let transition_id = 2;
1218
1219        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1220        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1221        vec![0, 1, 3, 4, 5]
1222            .into_iter()
1223            .try_for_each(|val| {
1224                cursor.append(
1225                    transition_id,
1226                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1227                )
1228            })
1229            .expect(ERROR_APPEND);
1230        tx.commit().expect(ERROR_COMMIT);
1231
1232        // APPEND DUP & APPEND
1233        let subkey_to_append = 2;
1234        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1235        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1236        assert_eq!(
1237            cursor.append_dup(
1238                transition_id,
1239                AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1240            ),
1241            Err(DatabaseWriteError {
1242                info: Error::KeyMismatch.into(),
1243                operation: DatabaseWriteOperation::CursorAppendDup,
1244                table_name: AccountChangeSets::NAME,
1245                key: transition_id.encode().into(),
1246            }
1247            .into())
1248        );
1249        assert_eq!(
1250            cursor.append(
1251                transition_id - 1,
1252                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1253            ),
1254            Err(DatabaseWriteError {
1255                info: Error::KeyMismatch.into(),
1256                operation: DatabaseWriteOperation::CursorAppend,
1257                table_name: AccountChangeSets::NAME,
1258                key: (transition_id - 1).encode().into(),
1259            }
1260            .into())
1261        );
1262        assert_eq!(
1263            cursor.append(
1264                transition_id,
1265                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1266            ),
1267            Ok(())
1268        );
1269    }
1270
1271    #[test]
1272    fn db_closure_put_get() {
1273        let path = TempDir::new().expect(ERROR_TEMPDIR).keep();
1274
1275        let value = Account {
1276            nonce: 18446744073709551615,
1277            bytecode_hash: Some(B256::random()),
1278            balance: U256::MAX,
1279        };
1280        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1281            .expect(ERROR_ETH_ADDRESS);
1282
1283        {
1284            let env = create_test_db_with_path(DatabaseEnvKind::RW, &path);
1285
1286            // PUT
1287            let result = env.update(|tx| {
1288                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1289                200
1290            });
1291            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1292        }
1293
1294        let env = DatabaseEnv::open(
1295            &path,
1296            DatabaseEnvKind::RO,
1297            DatabaseArguments::new(ClientVersion::default()),
1298        )
1299        .expect(ERROR_DB_CREATION);
1300
1301        // GET
1302        let result =
1303            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1304
1305        assert_eq!(result, Some(value))
1306    }
1307
1308    #[test]
1309    fn db_dup_sort() {
1310        let env = create_test_db(DatabaseEnvKind::RW);
1311        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1312            .expect(ERROR_ETH_ADDRESS);
1313
1314        // PUT (0,0)
1315        let value00 = StorageEntry::default();
1316        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1317
1318        // PUT (2,2)
1319        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1320        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1321
1322        // PUT (1,1)
1323        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1324        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1325
1326        // Iterate with cursor
1327        {
1328            let tx = env.tx().expect(ERROR_INIT_TX);
1329            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1330
1331            // Notice that value11 and value22 have been ordered in the DB.
1332            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1333            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1334            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1335        }
1336
1337        // Seek value with exact subkey
1338        {
1339            let tx = env.tx().expect(ERROR_INIT_TX);
1340            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1341            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1342            assert_eq!(
1343                (key, value11),
1344                walker
1345                    .next()
1346                    .expect("element should exist.")
1347                    .expect("should be able to retrieve it.")
1348            );
1349        }
1350    }
1351
1352    #[test]
1353    fn db_walk_dup_with_not_existing_key() {
1354        let env = create_test_db(DatabaseEnvKind::RW);
1355        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1356            .expect(ERROR_ETH_ADDRESS);
1357
1358        // PUT (0,0)
1359        let value00 = StorageEntry::default();
1360        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1361
1362        // PUT (2,2)
1363        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1364        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1365
1366        // PUT (1,1)
1367        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1368        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1369
1370        // Try to walk_dup with not existing key should immediately return None
1371        {
1372            let tx = env.tx().expect(ERROR_INIT_TX);
1373            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1374            let not_existing_key = Address::ZERO;
1375            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1376            assert_eq!(walker.next(), None);
1377        }
1378    }
1379
1380    #[test]
1381    fn db_iterate_over_all_dup_values() {
1382        let env = create_test_db(DatabaseEnvKind::RW);
1383        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1384            .expect(ERROR_ETH_ADDRESS);
1385        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1386            .expect(ERROR_ETH_ADDRESS);
1387
1388        // PUT key1 (0,0)
1389        let value00 = StorageEntry::default();
1390        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1391
1392        // PUT key1 (1,1)
1393        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1394        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1395
1396        // PUT key2 (2,2)
1397        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1398        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1399
1400        // Iterate with walk_dup
1401        {
1402            let tx = env.tx().expect(ERROR_INIT_TX);
1403            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1404            let mut walker = cursor.walk_dup(None, None).unwrap();
1405
1406            // Notice that value11 and value22 have been ordered in the DB.
1407            assert_eq!(Some(Ok((key1, value00))), walker.next());
1408            assert_eq!(Some(Ok((key1, value11))), walker.next());
1409            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1410            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1411            assert_eq!(None, walker.next());
1412        }
1413
1414        // Iterate by using `walk`
1415        {
1416            let tx = env.tx().expect(ERROR_INIT_TX);
1417            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1418            let first = cursor.first().unwrap().unwrap();
1419            let mut walker = cursor.walk(Some(first.0)).unwrap();
1420            assert_eq!(Some(Ok((key1, value00))), walker.next());
1421            assert_eq!(Some(Ok((key1, value11))), walker.next());
1422            assert_eq!(Some(Ok((key2, value22))), walker.next());
1423        }
1424    }
1425
1426    #[test]
1427    fn dup_value_with_same_subkey() {
1428        let env = create_test_db(DatabaseEnvKind::RW);
1429        let key1 = Address::new([0x11; 20]);
1430        let key2 = Address::new([0x22; 20]);
1431
1432        // PUT key1 (0,1)
1433        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1434        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1435
1436        // PUT key1 (0,0)
1437        let value00 = StorageEntry::default();
1438        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1439
1440        // PUT key2 (2,2)
1441        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1442        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1443
1444        // Iterate with walk
1445        {
1446            let tx = env.tx().expect(ERROR_INIT_TX);
1447            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1448            let first = cursor.first().unwrap().unwrap();
1449            let mut walker = cursor.walk(Some(first.0)).unwrap();
1450
1451            // NOTE: Both values are present
1452            assert_eq!(Some(Ok((key1, value00))), walker.next());
1453            assert_eq!(Some(Ok((key1, value01))), walker.next());
1454            assert_eq!(Some(Ok((key2, value22))), walker.next());
1455        }
1456
1457        // seek_by_key_subkey
1458        {
1459            let tx = env.tx().expect(ERROR_INIT_TX);
1460            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1461
1462            // NOTE: There are two values with same SubKey but only first one is shown
1463            assert_eq!(Ok(Some(value00)), cursor.seek_by_key_subkey(key1, value00.key));
1464            // key1 but value is greater than the one in the DB
1465            assert_eq!(Ok(None), cursor.seek_by_key_subkey(key1, value22.key));
1466        }
1467    }
1468
1469    #[test]
1470    fn db_sharded_key() {
1471        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1472        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1473
1474        let shards = 5;
1475        for i in 1..=shards {
1476            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1477            let list = IntegerList::new_pre_sorted([i * 100u64]);
1478
1479            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1480                .unwrap();
1481        }
1482
1483        // Seek value with non existing key.
1484        {
1485            let tx = db.tx().expect(ERROR_INIT_TX);
1486            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1487
1488            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1489            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1490            // `Address | 200`.
1491            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1492            let (key, list) = walker
1493                .next()
1494                .expect("element should exist.")
1495                .expect("should be able to retrieve it.");
1496
1497            assert_eq!(ShardedKey::new(real_key, 200), key);
1498            let list200 = IntegerList::new_pre_sorted([200u64]);
1499            assert_eq!(list200, list);
1500        }
1501        // Seek greatest index
1502        {
1503            let tx = db.tx().expect(ERROR_INIT_TX);
1504            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1505
1506            // It will seek the MAX value of transition index and try to use prev to get first
1507            // biggers.
1508            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1509            let (key, list) = cursor
1510                .prev()
1511                .expect("element should exist.")
1512                .expect("should be able to retrieve it.");
1513
1514            assert_eq!(ShardedKey::new(real_key, 400), key);
1515            let list400 = IntegerList::new_pre_sorted([400u64]);
1516            assert_eq!(list400, list);
1517        }
1518    }
1519}