reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::Path,
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106    /// Defines the synchronization strategy used by the MDBX database when writing data to disk.
107    ///
108    /// This determines how aggressively MDBX ensures data durability versus prioritizing
109    /// performance. The available modes are:
110    ///
111    /// - [`SyncMode::Durable`]: Ensures all transactions are fully flushed to disk before they are
112    ///   considered committed.   This provides the highest level of durability and crash safety
113    ///   but may have a performance cost.
114    /// - [`SyncMode::SafeNoSync`]: Skips certain fsync operations to improve write performance.
115    ///   This mode still maintains database integrity but may lose the most recent transactions if
116    ///   the system crashes unexpectedly.
117    ///
118    /// Choose `Durable` if consistency and crash safety are critical (e.g., production
119    /// environments). Choose `SafeNoSync` if performance is more important and occasional data
120    /// loss is acceptable (e.g., testing or ephemeral data).
121    sync_mode: SyncMode,
122}
123
124impl Default for DatabaseArguments {
125    fn default() -> Self {
126        Self::new(ClientVersion::default())
127    }
128}
129
130impl DatabaseArguments {
131    /// Create new database arguments with given client version.
132    pub fn new(client_version: ClientVersion) -> Self {
133        Self {
134            client_version,
135            geometry: Geometry {
136                size: Some(0..(8 * TERABYTE)),
137                growth_step: Some(4 * GIGABYTE as isize),
138                shrink_threshold: Some(0),
139                page_size: Some(PageSize::Set(default_page_size())),
140            },
141            log_level: None,
142            max_read_transaction_duration: None,
143            exclusive: None,
144            max_readers: None,
145            sync_mode: SyncMode::Durable,
146        }
147    }
148
149    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
150    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
151        if let Some(max_size) = max_size {
152            self.geometry.size = Some(0..max_size);
153        }
154        self
155    }
156
157    /// Sets the database sync mode.
158    pub const fn with_sync_mode(mut self, sync_mode: Option<SyncMode>) -> Self {
159        if let Some(sync_mode) = sync_mode {
160            self.sync_mode = sync_mode;
161        }
162
163        self
164    }
165
166    /// Configures the database growth step in bytes.
167    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
168        if let Some(growth_step) = growth_step {
169            self.geometry.growth_step = Some(growth_step as isize);
170        }
171        self
172    }
173
174    /// Set the log level.
175    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
176        self.log_level = log_level;
177        self
178    }
179
180    /// Set the maximum duration of a read transaction.
181    pub const fn max_read_transaction_duration(
182        &mut self,
183        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
184    ) {
185        self.max_read_transaction_duration = max_read_transaction_duration;
186    }
187
188    /// Set the maximum duration of a read transaction.
189    pub const fn with_max_read_transaction_duration(
190        mut self,
191        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
192    ) -> Self {
193        self.max_read_transaction_duration(max_read_transaction_duration);
194        self
195    }
196
197    /// Set the mdbx exclusive flag.
198    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
199        self.exclusive = exclusive;
200        self
201    }
202
203    /// Set `max_readers` flag.
204    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
205        self.max_readers = max_readers;
206        self
207    }
208
209    /// Returns the client version if any.
210    pub const fn client_version(&self) -> &ClientVersion {
211        &self.client_version
212    }
213}
214
215/// Wrapper for the libmdbx environment: [Environment]
216#[derive(Debug)]
217pub struct DatabaseEnv {
218    /// Libmdbx-sys environment.
219    inner: Environment,
220    /// Opened DBIs for reuse.
221    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
222    /// More generally, do not dynamically create, re-open, or drop tables at
223    /// runtime. It's better to perform table creation and migration only once
224    /// at startup.
225    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
226    /// Cache for metric handles. If `None`, metrics are not recorded.
227    metrics: Option<Arc<DatabaseEnvMetrics>>,
228    /// Write lock for when dealing with a read-write environment.
229    _lock_file: Option<StorageLock>,
230}
231
232impl Database for DatabaseEnv {
233    type TX = tx::Tx<RO>;
234    type TXMut = tx::Tx<RW>;
235
236    fn tx(&self) -> Result<Self::TX, DatabaseError> {
237        Tx::new(
238            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
239            self.dbis.clone(),
240            self.metrics.clone(),
241        )
242        .map_err(|e| DatabaseError::InitTx(e.into()))
243    }
244
245    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
246        Tx::new(
247            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
248            self.dbis.clone(),
249            self.metrics.clone(),
250        )
251        .map_err(|e| DatabaseError::InitTx(e.into()))
252    }
253}
254
255impl DatabaseMetrics for DatabaseEnv {
256    fn report_metrics(&self) {
257        for (name, value, labels) in self.gauge_metrics() {
258            gauge!(name, labels).set(value);
259        }
260    }
261
262    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
263        let mut metrics = Vec::new();
264
265        let _ = self
266            .view(|tx| {
267                for table in Tables::ALL.iter().map(Tables::name) {
268                    let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
269
270                    let stats = tx
271                        .inner
272                        .db_stat(&table_db)
273                        .wrap_err(format!("Could not find table: {table}"))?;
274
275                    let page_size = stats.page_size() as usize;
276                    let leaf_pages = stats.leaf_pages();
277                    let branch_pages = stats.branch_pages();
278                    let overflow_pages = stats.overflow_pages();
279                    let num_pages = leaf_pages + branch_pages + overflow_pages;
280                    let table_size = page_size * num_pages;
281                    let entries = stats.entries();
282
283                    metrics.push((
284                        "db.table_size",
285                        table_size as f64,
286                        vec![Label::new("table", table)],
287                    ));
288                    metrics.push((
289                        "db.table_pages",
290                        leaf_pages as f64,
291                        vec![Label::new("table", table), Label::new("type", "leaf")],
292                    ));
293                    metrics.push((
294                        "db.table_pages",
295                        branch_pages as f64,
296                        vec![Label::new("table", table), Label::new("type", "branch")],
297                    ));
298                    metrics.push((
299                        "db.table_pages",
300                        overflow_pages as f64,
301                        vec![Label::new("table", table), Label::new("type", "overflow")],
302                    ));
303                    metrics.push((
304                        "db.table_entries",
305                        entries as f64,
306                        vec![Label::new("table", table)],
307                    ));
308                }
309
310                Ok::<(), eyre::Report>(())
311            })
312            .map_err(|error| error!(%error, "Failed to read db table stats"));
313
314        if let Ok(freelist) =
315            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
316        {
317            metrics.push(("db.freelist", freelist as f64, vec![]));
318        }
319
320        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
321            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
322        }
323
324        metrics.push((
325            "db.timed_out_not_aborted_transactions",
326            self.timed_out_not_aborted_transactions() as f64,
327            vec![],
328        ));
329
330        metrics
331    }
332}
333
334impl DatabaseEnv {
335    /// Opens the database at the specified path with the given `EnvKind`.
336    ///
337    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
338    pub fn open(
339        path: &Path,
340        kind: DatabaseEnvKind,
341        args: DatabaseArguments,
342    ) -> Result<Self, DatabaseError> {
343        let _lock_file = if kind.is_rw() {
344            StorageLock::try_acquire(path)
345                .map_err(|err| DatabaseError::Other(err.to_string()))?
346                .into()
347        } else {
348            None
349        };
350
351        let mut inner_env = Environment::builder();
352
353        let mode = match kind {
354            DatabaseEnvKind::RO => Mode::ReadOnly,
355            DatabaseEnvKind::RW => {
356                // enable writemap mode in RW mode
357                inner_env.write_map();
358                Mode::ReadWrite { sync_mode: args.sync_mode }
359            }
360        };
361
362        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
363        // environment creation.
364        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
365        inner_env.set_max_dbs(256);
366        inner_env.set_geometry(args.geometry);
367
368        fn is_current_process(id: u32) -> bool {
369            #[cfg(unix)]
370            {
371                id == std::os::unix::process::parent_id() || id == std::process::id()
372            }
373
374            #[cfg(not(unix))]
375            {
376                id == std::process::id()
377            }
378        }
379
380        extern "C" fn handle_slow_readers(
381            _env: *const ffi::MDBX_env,
382            _txn: *const ffi::MDBX_txn,
383            process_id: ffi::mdbx_pid_t,
384            thread_id: ffi::mdbx_tid_t,
385            read_txn_id: u64,
386            gap: std::ffi::c_uint,
387            space: usize,
388            retry: std::ffi::c_int,
389        ) -> HandleSlowReadersReturnCode {
390            if space > MAX_SAFE_READER_SPACE {
391                let message = if is_current_process(process_id as u32) {
392                    "Current process has a long-lived database transaction that grows the database file."
393                } else {
394                    "External process has a long-lived database transaction that grows the database file. \
395                     Use shorter-lived read transactions or shut down the node."
396                };
397                reth_tracing::tracing::warn!(
398                    target: "storage::db::mdbx",
399                    ?process_id,
400                    ?thread_id,
401                    ?read_txn_id,
402                    ?gap,
403                    ?space,
404                    ?retry,
405                    "{message}"
406                )
407            }
408
409            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
410        }
411        inner_env.set_handle_slow_readers(handle_slow_readers);
412
413        inner_env.set_flags(EnvironmentFlags {
414            mode,
415            // We disable readahead because it improves performance for linear scans, but
416            // worsens it for random access (which is our access pattern outside of sync)
417            no_rdahead: true,
418            coalesce: true,
419            exclusive: args.exclusive.unwrap_or_default(),
420            ..Default::default()
421        });
422        // Configure more readers
423        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
424        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
425        // is "pages". Reclaimed list is the list of freed pages that's populated during the
426        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
427        // record with overflow pages. The flow is roughly the following:
428        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
429        //    sequence inside the DB file).
430        // 1. Get some pages from the freelist, put them into the reclaimed list.
431        // 2. Search through the reclaimed list for the sequence of size N.
432        // 3. a. If found, return the sequence.
433        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
434        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
435        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
436        //
437        // Basically, this parameter controls for how long do we search through the freelist before
438        // trying to allocate new pages. Smaller value will make MDBX to fallback to
439        // allocation faster, higher value will force MDBX to search through the freelist
440        // longer until the sequence of pages is found.
441        //
442        // The default value of this parameter is set depending on the DB size. The bigger the
443        // database, the larger is `rp augment limit`.
444        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
445        //
446        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
447        // because we want to prioritize freelist lookup speed over database growth.
448        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
449        inner_env.set_rp_augment_limit(256 * 1024);
450
451        if let Some(log_level) = args.log_level {
452            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
453            let is_log_level_available = if cfg!(debug_assertions) {
454                true
455            } else {
456                matches!(
457                    log_level,
458                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
459                )
460            };
461            if is_log_level_available {
462                inner_env.set_log_level(match log_level {
463                    LogLevel::Fatal => 0,
464                    LogLevel::Error => 1,
465                    LogLevel::Warn => 2,
466                    LogLevel::Notice => 3,
467                    LogLevel::Verbose => 4,
468                    LogLevel::Debug => 5,
469                    LogLevel::Trace => 6,
470                    LogLevel::Extra => 7,
471                });
472            } else {
473                return Err(DatabaseError::LogLevelUnavailable(log_level))
474            }
475        }
476
477        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
478            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
479        }
480
481        let env = Self {
482            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
483            dbis: Arc::default(),
484            metrics: None,
485            _lock_file,
486        };
487
488        Ok(env)
489    }
490
491    /// Enables metrics on the database.
492    pub fn with_metrics(mut self) -> Self {
493        self.metrics = Some(DatabaseEnvMetrics::new().into());
494        self
495    }
496
497    /// Creates all the tables defined in [`Tables`], if necessary.
498    ///
499    /// This keeps tracks of the created table handles and stores them for better efficiency.
500    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
501        self.create_and_track_tables_for::<Tables>()
502    }
503
504    /// Creates all the tables defined in the given [`TableSet`], if necessary.
505    ///
506    /// This keeps tracks of the created table handles and stores them for better efficiency.
507    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
508        let handles = self._create_tables::<TS>()?;
509        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
510        // before it can be shared.
511        let dbis = Arc::make_mut(&mut self.dbis);
512        dbis.extend(handles);
513
514        Ok(())
515    }
516
517    /// Creates all the tables defined in [`Tables`], if necessary.
518    ///
519    /// If this type is unique the created handle for the tables will be updated.
520    ///
521    /// This is recommended to be called during initialization to create and track additional tables
522    /// after the default [`Self::create_tables`] are created.
523    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
524        let handles = self._create_tables::<TS>()?;
525        if let Some(db) = Arc::get_mut(self) {
526            // Note: The db is unique and the dbis as well, and they can also be cloned.
527            let dbis = Arc::make_mut(&mut db.dbis);
528            dbis.extend(handles);
529        }
530        Ok(())
531    }
532
533    /// Creates the tables and returns the identifiers of the tables.
534    fn _create_tables<TS: TableSet>(
535        &self,
536    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
537        let mut handles = Vec::new();
538        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
539
540        for table in TS::tables() {
541            let flags =
542                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
543
544            let db = tx
545                .create_db(Some(table.name()), flags)
546                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
547            handles.push((table.name(), db.dbi()));
548        }
549
550        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
551        Ok(handles)
552    }
553
554    /// Records version that accesses the database with write privileges.
555    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
556        if version.is_empty() {
557            return Ok(())
558        }
559
560        let tx = self.tx_mut()?;
561        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
562
563        let last_version = version_cursor.last()?.map(|(_, v)| v);
564        if Some(&version) != last_version.as_ref() {
565            version_cursor.upsert(
566                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
567                &version,
568            )?;
569            tx.commit()?;
570        }
571
572        Ok(())
573    }
574}
575
576impl Deref for DatabaseEnv {
577    type Target = Environment;
578
579    fn deref(&self) -> &Self::Target {
580        &self.inner
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use crate::{
588        tables::{
589            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
590        },
591        test_utils::*,
592        AccountChangeSets,
593    };
594    use alloy_consensus::Header;
595    use alloy_primitives::{address, Address, B256, U256};
596    use reth_db_api::{
597        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
598        models::{AccountBeforeTx, IntegerList, ShardedKey},
599        table::{Encode, Table},
600    };
601    use reth_libmdbx::Error;
602    use reth_primitives_traits::{Account, StorageEntry};
603    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
604    use std::str::FromStr;
605    use tempfile::TempDir;
606
607    /// Create database for testing
608    fn create_test_db(kind: DatabaseEnvKind) -> Arc<DatabaseEnv> {
609        Arc::new(create_test_db_with_path(
610            kind,
611            &tempfile::TempDir::new().expect(ERROR_TEMPDIR).keep(),
612        ))
613    }
614
615    /// Create database for testing with specified path
616    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
617        let mut env =
618            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
619                .expect(ERROR_DB_CREATION);
620        env.create_tables().expect(ERROR_TABLE_CREATION);
621        env
622    }
623
624    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
625    const ERROR_PUT: &str = "Not able to insert value into table.";
626    const ERROR_APPEND: &str = "Not able to append the value to the table.";
627    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
628    const ERROR_GET: &str = "Not able to get value from table.";
629    const ERROR_DEL: &str = "Not able to delete from table.";
630    const ERROR_COMMIT: &str = "Not able to commit transaction.";
631    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
632    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
633    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
634
635    #[test]
636    fn db_creation() {
637        create_test_db(DatabaseEnvKind::RW);
638    }
639
640    #[test]
641    fn db_manual_put_get() {
642        let env = create_test_db(DatabaseEnvKind::RW);
643
644        let value = Header::default();
645        let key = 1u64;
646
647        // PUT
648        let tx = env.tx_mut().expect(ERROR_INIT_TX);
649        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
650        tx.commit().expect(ERROR_COMMIT);
651
652        // GET
653        let tx = env.tx().expect(ERROR_INIT_TX);
654        let result = tx.get::<Headers>(key).expect(ERROR_GET);
655        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
656        tx.commit().expect(ERROR_COMMIT);
657    }
658
659    #[test]
660    fn db_dup_cursor_delete_first() {
661        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
662        let tx = db.tx_mut().expect(ERROR_INIT_TX);
663
664        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
665
666        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
667        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
668
669        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
670        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
671
672        assert_eq!(
673            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>(),
674            Ok(vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),])
675        );
676
677        let mut walker = dup_cursor.walk(None).unwrap();
678        walker.delete_current().expect(ERROR_DEL);
679
680        assert_eq!(walker.next(), Some(Ok((Address::with_last_byte(1), entry_1))));
681
682        // Check the tx view - it correctly holds entry_1
683        assert_eq!(
684            tx.cursor_dup_read::<PlainStorageState>()
685                .unwrap()
686                .walk(None)
687                .unwrap()
688                .collect::<Result<Vec<_>, _>>(),
689            Ok(vec![
690                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
691            ])
692        );
693
694        // Check the remainder of walker
695        assert_eq!(walker.next(), None);
696    }
697
698    #[test]
699    fn db_cursor_walk() {
700        let env = create_test_db(DatabaseEnvKind::RW);
701
702        let value = Header::default();
703        let key = 1u64;
704
705        // PUT
706        let tx = env.tx_mut().expect(ERROR_INIT_TX);
707        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
708        tx.commit().expect(ERROR_COMMIT);
709
710        // Cursor
711        let tx = env.tx().expect(ERROR_INIT_TX);
712        let mut cursor = tx.cursor_read::<Headers>().unwrap();
713
714        let first = cursor.first().unwrap();
715        assert!(first.is_some(), "First should be our put");
716
717        // Walk
718        let walk = cursor.walk(Some(key)).unwrap();
719        let first = walk.into_iter().next().unwrap().unwrap();
720        assert_eq!(first.1, value, "First next should be put value");
721    }
722
723    #[test]
724    fn db_cursor_walk_range() {
725        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
726
727        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
728        let tx = db.tx_mut().expect(ERROR_INIT_TX);
729        vec![0, 1, 2, 3]
730            .into_iter()
731            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
732            .expect(ERROR_PUT);
733        tx.commit().expect(ERROR_COMMIT);
734
735        let tx = db.tx().expect(ERROR_INIT_TX);
736        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
737
738        // [1, 3)
739        let mut walker = cursor.walk_range(1..3).unwrap();
740        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
741        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
742        assert_eq!(walker.next(), None);
743        // next() returns None after walker is done
744        assert_eq!(walker.next(), None);
745
746        // [1, 2]
747        let mut walker = cursor.walk_range(1..=2).unwrap();
748        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
749        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
750        // next() returns None after walker is done
751        assert_eq!(walker.next(), None);
752
753        // [1, ∞)
754        let mut walker = cursor.walk_range(1..).unwrap();
755        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
756        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
757        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
758        // next() returns None after walker is done
759        assert_eq!(walker.next(), None);
760
761        // [2, 4)
762        let mut walker = cursor.walk_range(2..4).unwrap();
763        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
764        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
765        assert_eq!(walker.next(), None);
766        // next() returns None after walker is done
767        assert_eq!(walker.next(), None);
768
769        // (∞, 3)
770        let mut walker = cursor.walk_range(..3).unwrap();
771        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
772        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
773        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
774        // next() returns None after walker is done
775        assert_eq!(walker.next(), None);
776
777        // (∞, ∞)
778        let mut walker = cursor.walk_range(..).unwrap();
779        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
780        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
781        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
782        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
783        // next() returns None after walker is done
784        assert_eq!(walker.next(), None);
785    }
786
787    #[test]
788    fn db_cursor_walk_range_on_dup_table() {
789        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
790
791        let address0 = Address::ZERO;
792        let address1 = Address::with_last_byte(1);
793        let address2 = Address::with_last_byte(2);
794
795        let tx = db.tx_mut().expect(ERROR_INIT_TX);
796        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
797            .expect(ERROR_PUT);
798        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
799            .expect(ERROR_PUT);
800        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
801            .expect(ERROR_PUT);
802        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
803            .expect(ERROR_PUT);
804        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
805            .expect(ERROR_PUT);
806        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
807            .expect(ERROR_PUT);
808        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
809            .expect(ERROR_PUT);
810        tx.commit().expect(ERROR_COMMIT);
811
812        let tx = db.tx().expect(ERROR_INIT_TX);
813        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
814
815        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
816        assert_eq!(entries.len(), 7);
817
818        let mut walker = cursor.walk_range(0..=1).unwrap();
819        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address0, info: None }))));
820        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address1, info: None }))));
821        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address2, info: None }))));
822        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address0, info: None }))));
823        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address1, info: None }))));
824        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address2, info: None }))));
825        assert_eq!(walker.next(), None);
826    }
827
828    #[expect(clippy::reversed_empty_ranges)]
829    #[test]
830    fn db_cursor_walk_range_invalid() {
831        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
832
833        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
834        let tx = db.tx_mut().expect(ERROR_INIT_TX);
835        vec![0, 1, 2, 3]
836            .into_iter()
837            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
838            .expect(ERROR_PUT);
839        tx.commit().expect(ERROR_COMMIT);
840
841        let tx = db.tx().expect(ERROR_INIT_TX);
842        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
843
844        // start bound greater than end bound
845        let mut res = cursor.walk_range(3..1).unwrap();
846        assert_eq!(res.next(), None);
847
848        // start bound greater than end bound
849        let mut res = cursor.walk_range(15..=2).unwrap();
850        assert_eq!(res.next(), None);
851
852        // returning nothing
853        let mut walker = cursor.walk_range(1..1).unwrap();
854        assert_eq!(walker.next(), None);
855    }
856
857    #[test]
858    fn db_walker() {
859        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
860
861        // PUT (0, 0), (1, 0), (3, 0)
862        let tx = db.tx_mut().expect(ERROR_INIT_TX);
863        vec![0, 1, 3]
864            .into_iter()
865            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
866            .expect(ERROR_PUT);
867        tx.commit().expect(ERROR_COMMIT);
868
869        let tx = db.tx().expect(ERROR_INIT_TX);
870        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
871
872        let mut walker = Walker::new(&mut cursor, None);
873
874        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
875        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
876        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
877        assert_eq!(walker.next(), None);
878
879        // transform to ReverseWalker
880        let mut reverse_walker = walker.rev();
881        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
882        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
883        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
884        assert_eq!(reverse_walker.next(), None);
885    }
886
887    #[test]
888    fn db_reverse_walker() {
889        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
890
891        // PUT (0, 0), (1, 0), (3, 0)
892        let tx = db.tx_mut().expect(ERROR_INIT_TX);
893        vec![0, 1, 3]
894            .into_iter()
895            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
896            .expect(ERROR_PUT);
897        tx.commit().expect(ERROR_COMMIT);
898
899        let tx = db.tx().expect(ERROR_INIT_TX);
900        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
901
902        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
903
904        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
905        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
906        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
907        assert_eq!(reverse_walker.next(), None);
908
909        // transform to Walker
910        let mut walker = reverse_walker.forward();
911        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
912        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
913        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
914        assert_eq!(walker.next(), None);
915    }
916
917    #[test]
918    fn db_walk_back() {
919        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
920
921        // PUT (0, 0), (1, 0), (3, 0)
922        let tx = db.tx_mut().expect(ERROR_INIT_TX);
923        vec![0, 1, 3]
924            .into_iter()
925            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
926            .expect(ERROR_PUT);
927        tx.commit().expect(ERROR_COMMIT);
928
929        let tx = db.tx().expect(ERROR_INIT_TX);
930        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
931
932        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
933        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
934        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
935        assert_eq!(reverse_walker.next(), None);
936
937        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
938        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
939        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
940        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
941        assert_eq!(reverse_walker.next(), None);
942
943        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
944        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
945        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
946        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
947        assert_eq!(reverse_walker.next(), None);
948
949        let mut reverse_walker = cursor.walk_back(None).unwrap();
950        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
951        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
952        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
953        assert_eq!(reverse_walker.next(), None);
954    }
955
956    #[test]
957    fn db_cursor_seek_exact_or_previous_key() {
958        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
959
960        // PUT
961        let tx = db.tx_mut().expect(ERROR_INIT_TX);
962        vec![0, 1, 3]
963            .into_iter()
964            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
965            .expect(ERROR_PUT);
966        tx.commit().expect(ERROR_COMMIT);
967
968        // Cursor
969        let missing_key = 2;
970        let tx = db.tx().expect(ERROR_INIT_TX);
971        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
972        assert_eq!(cursor.current(), Ok(None));
973
974        // Seek exact
975        let exact = cursor.seek_exact(missing_key).unwrap();
976        assert_eq!(exact, None);
977        assert_eq!(cursor.current(), Ok(None));
978    }
979
980    #[test]
981    fn db_cursor_insert() {
982        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
983
984        // PUT
985        let tx = db.tx_mut().expect(ERROR_INIT_TX);
986        vec![0, 1, 3, 4, 5]
987            .into_iter()
988            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
989            .expect(ERROR_PUT);
990        tx.commit().expect(ERROR_COMMIT);
991
992        let key_to_insert = 2;
993        let tx = db.tx_mut().expect(ERROR_INIT_TX);
994        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
995
996        // INSERT
997        assert_eq!(cursor.insert(key_to_insert, &B256::ZERO), Ok(()));
998        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
999
1000        // INSERT (failure)
1001        assert_eq!(
1002            cursor.insert(key_to_insert, &B256::ZERO),
1003            Err(DatabaseWriteError {
1004                info: Error::KeyExist.into(),
1005                operation: DatabaseWriteOperation::CursorInsert,
1006                table_name: CanonicalHeaders::NAME,
1007                key: key_to_insert.encode().into(),
1008            }
1009            .into())
1010        );
1011        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
1012
1013        tx.commit().expect(ERROR_COMMIT);
1014
1015        // Confirm the result
1016        let tx = db.tx().expect(ERROR_INIT_TX);
1017        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1018        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1019        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1020        tx.commit().expect(ERROR_COMMIT);
1021    }
1022
1023    #[test]
1024    fn db_cursor_insert_dup() {
1025        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1026        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1027
1028        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1029        let key = Address::random();
1030        let subkey1 = B256::random();
1031        let subkey2 = B256::random();
1032
1033        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1034        assert!(dup_cursor.insert(key, &entry1).is_ok());
1035
1036        // Can't insert
1037        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1038        assert!(dup_cursor.insert(key, &entry2).is_err());
1039    }
1040
1041    #[test]
1042    fn db_cursor_delete_current_non_existent() {
1043        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1044        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1045
1046        let key1 = Address::with_last_byte(1);
1047        let key2 = Address::with_last_byte(2);
1048        let key3 = Address::with_last_byte(3);
1049        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1050
1051        assert!(cursor.insert(key1, &Account::default()).is_ok());
1052        assert!(cursor.insert(key2, &Account::default()).is_ok());
1053        assert!(cursor.insert(key3, &Account::default()).is_ok());
1054
1055        // Seek & delete key2
1056        cursor.seek_exact(key2).unwrap();
1057        assert_eq!(cursor.delete_current(), Ok(()));
1058        assert_eq!(cursor.seek_exact(key2), Ok(None));
1059
1060        // Seek & delete key2 again
1061        assert_eq!(cursor.seek_exact(key2), Ok(None));
1062        assert_eq!(
1063            cursor.delete_current(),
1064            Err(DatabaseError::Delete(reth_libmdbx::Error::NoData.into()))
1065        );
1066        // Assert that key1 is still there
1067        assert_eq!(cursor.seek_exact(key1), Ok(Some((key1, Account::default()))));
1068        // Assert that key3 is still there
1069        assert_eq!(cursor.seek_exact(key3), Ok(Some((key3, Account::default()))));
1070    }
1071
1072    #[test]
1073    fn db_cursor_insert_wherever_cursor_is() {
1074        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1075        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1076
1077        // PUT
1078        vec![0, 1, 3, 5, 7, 9]
1079            .into_iter()
1080            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1081            .expect(ERROR_PUT);
1082        tx.commit().expect(ERROR_COMMIT);
1083
1084        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1085        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1086
1087        // INSERT (cursor starts at last)
1088        cursor.last().unwrap();
1089        assert_eq!(cursor.current(), Ok(Some((9, B256::ZERO))));
1090
1091        for pos in (2..=8).step_by(2) {
1092            assert_eq!(cursor.insert(pos, &B256::ZERO), Ok(()));
1093            assert_eq!(cursor.current(), Ok(Some((pos, B256::ZERO))));
1094        }
1095        tx.commit().expect(ERROR_COMMIT);
1096
1097        // Confirm the result
1098        let tx = db.tx().expect(ERROR_INIT_TX);
1099        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1100        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1101        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1102        tx.commit().expect(ERROR_COMMIT);
1103    }
1104
1105    #[test]
1106    fn db_cursor_append() {
1107        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1108
1109        // PUT
1110        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1111        vec![0, 1, 2, 3, 4]
1112            .into_iter()
1113            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1114            .expect(ERROR_PUT);
1115        tx.commit().expect(ERROR_COMMIT);
1116
1117        // APPEND
1118        let key_to_append = 5;
1119        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1120        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1121        assert_eq!(cursor.append(key_to_append, &B256::ZERO), Ok(()));
1122        tx.commit().expect(ERROR_COMMIT);
1123
1124        // Confirm the result
1125        let tx = db.tx().expect(ERROR_INIT_TX);
1126        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1127        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1128        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1129        tx.commit().expect(ERROR_COMMIT);
1130    }
1131
1132    #[test]
1133    fn db_cursor_append_failure() {
1134        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1135
1136        // PUT
1137        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1138        vec![0, 1, 3, 4, 5]
1139            .into_iter()
1140            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1141            .expect(ERROR_PUT);
1142        tx.commit().expect(ERROR_COMMIT);
1143
1144        // APPEND
1145        let key_to_append = 2;
1146        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1147        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1148        assert_eq!(
1149            cursor.append(key_to_append, &B256::ZERO),
1150            Err(DatabaseWriteError {
1151                info: Error::KeyMismatch.into(),
1152                operation: DatabaseWriteOperation::CursorAppend,
1153                table_name: CanonicalHeaders::NAME,
1154                key: key_to_append.encode().into(),
1155            }
1156            .into())
1157        );
1158        assert_eq!(cursor.current(), Ok(Some((5, B256::ZERO)))); // the end of table
1159        tx.commit().expect(ERROR_COMMIT);
1160
1161        // Confirm the result
1162        let tx = db.tx().expect(ERROR_INIT_TX);
1163        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1164        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1165        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1166        tx.commit().expect(ERROR_COMMIT);
1167    }
1168
1169    #[test]
1170    fn db_cursor_upsert() {
1171        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1172        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1173
1174        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1175        let key = Address::random();
1176
1177        let account = Account::default();
1178        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1179        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1180
1181        let account = Account { nonce: 1, ..Default::default() };
1182        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1183        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1184
1185        let account = Account { nonce: 2, ..Default::default() };
1186        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1187        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1188
1189        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1190        let subkey = B256::random();
1191
1192        let value = U256::from(1);
1193        let entry1 = StorageEntry { key: subkey, value };
1194        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1195        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1196
1197        let value = U256::from(2);
1198        let entry2 = StorageEntry { key: subkey, value };
1199        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1200        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1201        assert_eq!(dup_cursor.next_dup_val(), Ok(Some(entry2)));
1202    }
1203
1204    #[test]
1205    fn db_cursor_dupsort_append() {
1206        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1207
1208        let transition_id = 2;
1209
1210        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1211        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1212        vec![0, 1, 3, 4, 5]
1213            .into_iter()
1214            .try_for_each(|val| {
1215                cursor.append(
1216                    transition_id,
1217                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1218                )
1219            })
1220            .expect(ERROR_APPEND);
1221        tx.commit().expect(ERROR_COMMIT);
1222
1223        // APPEND DUP & APPEND
1224        let subkey_to_append = 2;
1225        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1226        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1227        assert_eq!(
1228            cursor.append_dup(
1229                transition_id,
1230                AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1231            ),
1232            Err(DatabaseWriteError {
1233                info: Error::KeyMismatch.into(),
1234                operation: DatabaseWriteOperation::CursorAppendDup,
1235                table_name: AccountChangeSets::NAME,
1236                key: transition_id.encode().into(),
1237            }
1238            .into())
1239        );
1240        assert_eq!(
1241            cursor.append(
1242                transition_id - 1,
1243                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1244            ),
1245            Err(DatabaseWriteError {
1246                info: Error::KeyMismatch.into(),
1247                operation: DatabaseWriteOperation::CursorAppend,
1248                table_name: AccountChangeSets::NAME,
1249                key: (transition_id - 1).encode().into(),
1250            }
1251            .into())
1252        );
1253        assert_eq!(
1254            cursor.append(
1255                transition_id,
1256                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1257            ),
1258            Ok(())
1259        );
1260    }
1261
1262    #[test]
1263    fn db_closure_put_get() {
1264        let path = TempDir::new().expect(ERROR_TEMPDIR).keep();
1265
1266        let value = Account {
1267            nonce: 18446744073709551615,
1268            bytecode_hash: Some(B256::random()),
1269            balance: U256::MAX,
1270        };
1271        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1272            .expect(ERROR_ETH_ADDRESS);
1273
1274        {
1275            let env = create_test_db_with_path(DatabaseEnvKind::RW, &path);
1276
1277            // PUT
1278            let result = env.update(|tx| {
1279                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1280                200
1281            });
1282            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1283        }
1284
1285        let env = DatabaseEnv::open(
1286            &path,
1287            DatabaseEnvKind::RO,
1288            DatabaseArguments::new(ClientVersion::default()),
1289        )
1290        .expect(ERROR_DB_CREATION);
1291
1292        // GET
1293        let result =
1294            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1295
1296        assert_eq!(result, Some(value))
1297    }
1298
1299    #[test]
1300    fn db_dup_sort() {
1301        let env = create_test_db(DatabaseEnvKind::RW);
1302        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1303            .expect(ERROR_ETH_ADDRESS);
1304
1305        // PUT (0,0)
1306        let value00 = StorageEntry::default();
1307        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1308
1309        // PUT (2,2)
1310        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1311        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1312
1313        // PUT (1,1)
1314        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1315        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1316
1317        // Iterate with cursor
1318        {
1319            let tx = env.tx().expect(ERROR_INIT_TX);
1320            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1321
1322            // Notice that value11 and value22 have been ordered in the DB.
1323            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1324            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1325            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1326        }
1327
1328        // Seek value with exact subkey
1329        {
1330            let tx = env.tx().expect(ERROR_INIT_TX);
1331            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1332            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1333            assert_eq!(
1334                (key, value11),
1335                walker
1336                    .next()
1337                    .expect("element should exist.")
1338                    .expect("should be able to retrieve it.")
1339            );
1340        }
1341    }
1342
1343    #[test]
1344    fn db_walk_dup_with_not_existing_key() {
1345        let env = create_test_db(DatabaseEnvKind::RW);
1346        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1347            .expect(ERROR_ETH_ADDRESS);
1348
1349        // PUT (0,0)
1350        let value00 = StorageEntry::default();
1351        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1352
1353        // PUT (2,2)
1354        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1355        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1356
1357        // PUT (1,1)
1358        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1359        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1360
1361        // Try to walk_dup with not existing key should immediately return None
1362        {
1363            let tx = env.tx().expect(ERROR_INIT_TX);
1364            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1365            let not_existing_key = Address::ZERO;
1366            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1367            assert_eq!(walker.next(), None);
1368        }
1369    }
1370
1371    #[test]
1372    fn db_iterate_over_all_dup_values() {
1373        let env = create_test_db(DatabaseEnvKind::RW);
1374        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1375            .expect(ERROR_ETH_ADDRESS);
1376        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1377            .expect(ERROR_ETH_ADDRESS);
1378
1379        // PUT key1 (0,0)
1380        let value00 = StorageEntry::default();
1381        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1382
1383        // PUT key1 (1,1)
1384        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1385        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1386
1387        // PUT key2 (2,2)
1388        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1389        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1390
1391        // Iterate with walk_dup
1392        {
1393            let tx = env.tx().expect(ERROR_INIT_TX);
1394            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1395            let mut walker = cursor.walk_dup(None, None).unwrap();
1396
1397            // Notice that value11 and value22 have been ordered in the DB.
1398            assert_eq!(Some(Ok((key1, value00))), walker.next());
1399            assert_eq!(Some(Ok((key1, value11))), walker.next());
1400            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1401            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1402            assert_eq!(None, walker.next());
1403        }
1404
1405        // Iterate by using `walk`
1406        {
1407            let tx = env.tx().expect(ERROR_INIT_TX);
1408            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1409            let first = cursor.first().unwrap().unwrap();
1410            let mut walker = cursor.walk(Some(first.0)).unwrap();
1411            assert_eq!(Some(Ok((key1, value00))), walker.next());
1412            assert_eq!(Some(Ok((key1, value11))), walker.next());
1413            assert_eq!(Some(Ok((key2, value22))), walker.next());
1414        }
1415    }
1416
1417    #[test]
1418    fn dup_value_with_same_subkey() {
1419        let env = create_test_db(DatabaseEnvKind::RW);
1420        let key1 = Address::new([0x11; 20]);
1421        let key2 = Address::new([0x22; 20]);
1422
1423        // PUT key1 (0,1)
1424        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1425        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1426
1427        // PUT key1 (0,0)
1428        let value00 = StorageEntry::default();
1429        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1430
1431        // PUT key2 (2,2)
1432        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1433        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1434
1435        // Iterate with walk
1436        {
1437            let tx = env.tx().expect(ERROR_INIT_TX);
1438            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1439            let first = cursor.first().unwrap().unwrap();
1440            let mut walker = cursor.walk(Some(first.0)).unwrap();
1441
1442            // NOTE: Both values are present
1443            assert_eq!(Some(Ok((key1, value00))), walker.next());
1444            assert_eq!(Some(Ok((key1, value01))), walker.next());
1445            assert_eq!(Some(Ok((key2, value22))), walker.next());
1446        }
1447
1448        // seek_by_key_subkey
1449        {
1450            let tx = env.tx().expect(ERROR_INIT_TX);
1451            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1452
1453            // NOTE: There are two values with same SubKey but only first one is shown
1454            assert_eq!(Ok(Some(value00)), cursor.seek_by_key_subkey(key1, value00.key));
1455            // key1 but value is greater than the one in the DB
1456            assert_eq!(Ok(None), cursor.seek_by_key_subkey(key1, value22.key));
1457        }
1458    }
1459
1460    #[test]
1461    fn db_sharded_key() {
1462        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1463        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1464
1465        let shards = 5;
1466        for i in 1..=shards {
1467            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1468            let list = IntegerList::new_pre_sorted([i * 100u64]);
1469
1470            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1471                .unwrap();
1472        }
1473
1474        // Seek value with non existing key.
1475        {
1476            let tx = db.tx().expect(ERROR_INIT_TX);
1477            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1478
1479            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1480            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1481            // `Address | 200`.
1482            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1483            let (key, list) = walker
1484                .next()
1485                .expect("element should exist.")
1486                .expect("should be able to retrieve it.");
1487
1488            assert_eq!(ShardedKey::new(real_key, 200), key);
1489            let list200 = IntegerList::new_pre_sorted([200u64]);
1490            assert_eq!(list200, list);
1491        }
1492        // Seek greatest index
1493        {
1494            let tx = db.tx().expect(ERROR_INIT_TX);
1495            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1496
1497            // It will seek the MAX value of transition index and try to use prev to get first
1498            // biggers.
1499            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1500            let (key, list) = cursor
1501                .prev()
1502                .expect("element should exist.")
1503                .expect("should be able to retrieve it.");
1504
1505            assert_eq!(ShardedKey::new(real_key, 400), key);
1506            let list400 = IntegerList::new_pre_sorted([400u64]);
1507            assert_eq!(list400, list);
1508        }
1509    }
1510}