reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    collections::HashMap,
27    ops::{Deref, Range},
28    path::Path,
29    sync::Arc,
30    time::{SystemTime, UNIX_EPOCH},
31};
32use tx::Tx;
33
34pub mod cursor;
35pub mod tx;
36
37mod utils;
38
39/// 1 KB in bytes
40pub const KILOBYTE: usize = 1024;
41/// 1 MB in bytes
42pub const MEGABYTE: usize = KILOBYTE * 1024;
43/// 1 GB in bytes
44pub const GIGABYTE: usize = MEGABYTE * 1024;
45/// 1 TB in bytes
46pub const TERABYTE: usize = GIGABYTE * 1024;
47
48/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
49const DEFAULT_MAX_READERS: u64 = 32_000;
50
51/// Space that a read-only transaction can occupy until the warning is emitted.
52/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
53const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
54
55/// Environment used when opening a MDBX environment. RO/RW.
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum DatabaseEnvKind {
58    /// Read-only MDBX environment.
59    RO,
60    /// Read-write MDBX environment.
61    RW,
62}
63
64impl DatabaseEnvKind {
65    /// Returns `true` if the environment is read-write.
66    pub const fn is_rw(&self) -> bool {
67        matches!(self, Self::RW)
68    }
69}
70
71/// Arguments for database initialization.
72#[derive(Clone, Debug)]
73pub struct DatabaseArguments {
74    /// Client version that accesses the database.
75    client_version: ClientVersion,
76    /// Database geometry settings.
77    geometry: Geometry<Range<usize>>,
78    /// Database log level. If [None], the default value is used.
79    log_level: Option<LogLevel>,
80    /// Maximum duration of a read transaction. If [None], the default value is used.
81    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
82    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
83    ///
84    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
85    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
86    /// locking.
87    ///
88    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
89    /// environment already used by other process. The main feature of the exclusive mode is the
90    /// ability to open the environment placed on a network share.
91    ///
92    /// If `false` = open environment in cooperative mode, i.e. for multi-process
93    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
94    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
95    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
96    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
97    ///   open the given environment MUST be running in the physically single RAM with
98    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
99    ///   architecture, but this case has not been tested for a long time).
100    ///
101    /// This flag affects only at environment opening but can't be changed after.
102    exclusive: Option<bool>,
103    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
104    /// readers.
105    max_readers: Option<u64>,
106}
107
108impl Default for DatabaseArguments {
109    fn default() -> Self {
110        Self::new(ClientVersion::default())
111    }
112}
113
114impl DatabaseArguments {
115    /// Create new database arguments with given client version.
116    pub fn new(client_version: ClientVersion) -> Self {
117        Self {
118            client_version,
119            geometry: Geometry {
120                size: Some(0..(8 * TERABYTE)),
121                growth_step: Some(4 * GIGABYTE as isize),
122                shrink_threshold: Some(0),
123                page_size: Some(PageSize::Set(default_page_size())),
124            },
125            log_level: None,
126            max_read_transaction_duration: None,
127            exclusive: None,
128            max_readers: None,
129        }
130    }
131
132    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
133    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
134        if let Some(max_size) = max_size {
135            self.geometry.size = Some(0..max_size);
136        }
137        self
138    }
139
140    /// Configures the database growth step in bytes.
141    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
142        if let Some(growth_step) = growth_step {
143            self.geometry.growth_step = Some(growth_step as isize);
144        }
145        self
146    }
147
148    /// Set the log level.
149    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
150        self.log_level = log_level;
151        self
152    }
153
154    /// Set the maximum duration of a read transaction.
155    pub const fn max_read_transaction_duration(
156        &mut self,
157        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
158    ) {
159        self.max_read_transaction_duration = max_read_transaction_duration;
160    }
161
162    /// Set the maximum duration of a read transaction.
163    pub const fn with_max_read_transaction_duration(
164        mut self,
165        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
166    ) -> Self {
167        self.max_read_transaction_duration(max_read_transaction_duration);
168        self
169    }
170
171    /// Set the mdbx exclusive flag.
172    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
173        self.exclusive = exclusive;
174        self
175    }
176
177    /// Set `max_readers` flag.
178    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
179        self.max_readers = max_readers;
180        self
181    }
182
183    /// Returns the client version if any.
184    pub const fn client_version(&self) -> &ClientVersion {
185        &self.client_version
186    }
187}
188
189/// Wrapper for the libmdbx environment: [Environment]
190#[derive(Debug)]
191pub struct DatabaseEnv {
192    /// Libmdbx-sys environment.
193    inner: Environment,
194    /// Opened DBIs for reuse.
195    /// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
196    /// More generally, do not dynamically create, re-open, or drop tables at
197    /// runtime. It's better to perform table creation and migration only once
198    /// at startup.
199    dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
200    /// Cache for metric handles. If `None`, metrics are not recorded.
201    metrics: Option<Arc<DatabaseEnvMetrics>>,
202    /// Write lock for when dealing with a read-write environment.
203    _lock_file: Option<StorageLock>,
204}
205
206impl Database for DatabaseEnv {
207    type TX = tx::Tx<RO>;
208    type TXMut = tx::Tx<RW>;
209
210    fn tx(&self) -> Result<Self::TX, DatabaseError> {
211        Tx::new(
212            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
213            self.dbis.clone(),
214            self.metrics.clone(),
215        )
216        .map_err(|e| DatabaseError::InitTx(e.into()))
217    }
218
219    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
220        Tx::new(
221            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
222            self.dbis.clone(),
223            self.metrics.clone(),
224        )
225        .map_err(|e| DatabaseError::InitTx(e.into()))
226    }
227}
228
229impl DatabaseMetrics for DatabaseEnv {
230    fn report_metrics(&self) {
231        for (name, value, labels) in self.gauge_metrics() {
232            gauge!(name, labels).set(value);
233        }
234    }
235
236    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
237        let mut metrics = Vec::new();
238
239        let _ = self
240            .view(|tx| {
241                for table in Tables::ALL.iter().map(Tables::name) {
242                    let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
243
244                    let stats = tx
245                        .inner
246                        .db_stat(&table_db)
247                        .wrap_err(format!("Could not find table: {table}"))?;
248
249                    let page_size = stats.page_size() as usize;
250                    let leaf_pages = stats.leaf_pages();
251                    let branch_pages = stats.branch_pages();
252                    let overflow_pages = stats.overflow_pages();
253                    let num_pages = leaf_pages + branch_pages + overflow_pages;
254                    let table_size = page_size * num_pages;
255                    let entries = stats.entries();
256
257                    metrics.push((
258                        "db.table_size",
259                        table_size as f64,
260                        vec![Label::new("table", table)],
261                    ));
262                    metrics.push((
263                        "db.table_pages",
264                        leaf_pages as f64,
265                        vec![Label::new("table", table), Label::new("type", "leaf")],
266                    ));
267                    metrics.push((
268                        "db.table_pages",
269                        branch_pages as f64,
270                        vec![Label::new("table", table), Label::new("type", "branch")],
271                    ));
272                    metrics.push((
273                        "db.table_pages",
274                        overflow_pages as f64,
275                        vec![Label::new("table", table), Label::new("type", "overflow")],
276                    ));
277                    metrics.push((
278                        "db.table_entries",
279                        entries as f64,
280                        vec![Label::new("table", table)],
281                    ));
282                }
283
284                Ok::<(), eyre::Report>(())
285            })
286            .map_err(|error| error!(%error, "Failed to read db table stats"));
287
288        if let Ok(freelist) =
289            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
290        {
291            metrics.push(("db.freelist", freelist as f64, vec![]));
292        }
293
294        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
295            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
296        }
297
298        metrics.push((
299            "db.timed_out_not_aborted_transactions",
300            self.timed_out_not_aborted_transactions() as f64,
301            vec![],
302        ));
303
304        metrics
305    }
306}
307
308impl DatabaseEnv {
309    /// Opens the database at the specified path with the given `EnvKind`.
310    ///
311    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
312    pub fn open(
313        path: &Path,
314        kind: DatabaseEnvKind,
315        args: DatabaseArguments,
316    ) -> Result<Self, DatabaseError> {
317        let _lock_file = if kind.is_rw() {
318            StorageLock::try_acquire(path)
319                .map_err(|err| DatabaseError::Other(err.to_string()))?
320                .into()
321        } else {
322            None
323        };
324
325        let mut inner_env = Environment::builder();
326
327        let mode = match kind {
328            DatabaseEnvKind::RO => Mode::ReadOnly,
329            DatabaseEnvKind::RW => {
330                // enable writemap mode in RW mode
331                inner_env.write_map();
332                Mode::ReadWrite { sync_mode: SyncMode::Durable }
333            }
334        };
335
336        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
337        // environment creation.
338        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
339        inner_env.set_max_dbs(256);
340        inner_env.set_geometry(args.geometry);
341
342        fn is_current_process(id: u32) -> bool {
343            #[cfg(unix)]
344            {
345                id == std::os::unix::process::parent_id() || id == std::process::id()
346            }
347
348            #[cfg(not(unix))]
349            {
350                id == std::process::id()
351            }
352        }
353
354        extern "C" fn handle_slow_readers(
355            _env: *const ffi::MDBX_env,
356            _txn: *const ffi::MDBX_txn,
357            process_id: ffi::mdbx_pid_t,
358            thread_id: ffi::mdbx_tid_t,
359            read_txn_id: u64,
360            gap: std::ffi::c_uint,
361            space: usize,
362            retry: std::ffi::c_int,
363        ) -> HandleSlowReadersReturnCode {
364            if space > MAX_SAFE_READER_SPACE {
365                let message = if is_current_process(process_id as u32) {
366                    "Current process has a long-lived database transaction that grows the database file."
367                } else {
368                    "External process has a long-lived database transaction that grows the database file. \
369                     Use shorter-lived read transactions or shut down the node."
370                };
371                reth_tracing::tracing::warn!(
372                    target: "storage::db::mdbx",
373                    ?process_id,
374                    ?thread_id,
375                    ?read_txn_id,
376                    ?gap,
377                    ?space,
378                    ?retry,
379                    "{message}"
380                )
381            }
382
383            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
384        }
385        inner_env.set_handle_slow_readers(handle_slow_readers);
386
387        inner_env.set_flags(EnvironmentFlags {
388            mode,
389            // We disable readahead because it improves performance for linear scans, but
390            // worsens it for random access (which is our access pattern outside of sync)
391            no_rdahead: true,
392            coalesce: true,
393            exclusive: args.exclusive.unwrap_or_default(),
394            ..Default::default()
395        });
396        // Configure more readers
397        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
398        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
399        // is "pages". Reclaimed list is the list of freed pages that's populated during the
400        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
401        // record with overflow pages. The flow is roughly the following:
402        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
403        //    sequence inside the DB file).
404        // 1. Get some pages from the freelist, put them into the reclaimed list.
405        // 2. Search through the reclaimed list for the sequence of size N.
406        // 3. a. If found, return the sequence.
407        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
408        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
409        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
410        //
411        // Basically, this parameter controls for how long do we search through the freelist before
412        // trying to allocate new pages. Smaller value will make MDBX to fallback to
413        // allocation faster, higher value will force MDBX to search through the freelist
414        // longer until the sequence of pages is found.
415        //
416        // The default value of this parameter is set depending on the DB size. The bigger the
417        // database, the larger is `rp augment limit`.
418        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
419        //
420        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
421        // because we want to prioritize freelist lookup speed over database growth.
422        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
423        inner_env.set_rp_augment_limit(256 * 1024);
424
425        if let Some(log_level) = args.log_level {
426            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
427            let is_log_level_available = if cfg!(debug_assertions) {
428                true
429            } else {
430                matches!(
431                    log_level,
432                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
433                )
434            };
435            if is_log_level_available {
436                inner_env.set_log_level(match log_level {
437                    LogLevel::Fatal => 0,
438                    LogLevel::Error => 1,
439                    LogLevel::Warn => 2,
440                    LogLevel::Notice => 3,
441                    LogLevel::Verbose => 4,
442                    LogLevel::Debug => 5,
443                    LogLevel::Trace => 6,
444                    LogLevel::Extra => 7,
445                });
446            } else {
447                return Err(DatabaseError::LogLevelUnavailable(log_level))
448            }
449        }
450
451        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
452            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
453        }
454
455        let env = Self {
456            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
457            dbis: Arc::default(),
458            metrics: None,
459            _lock_file,
460        };
461
462        Ok(env)
463    }
464
465    /// Enables metrics on the database.
466    pub fn with_metrics(mut self) -> Self {
467        self.metrics = Some(DatabaseEnvMetrics::new().into());
468        self
469    }
470
471    /// Creates all the tables defined in [`Tables`], if necessary.
472    ///
473    /// This keeps tracks of the created table handles and stores them for better efficiency.
474    pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
475        self.create_and_track_tables_for::<Tables>()
476    }
477
478    /// Creates all the tables defined in the given [`TableSet`], if necessary.
479    ///
480    /// This keeps tracks of the created table handles and stores them for better efficiency.
481    pub fn create_and_track_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
482        let handles = self._create_tables::<TS>()?;
483        // Note: This is okay because self has mutable access here and `DatabaseEnv` must be Arc'ed
484        // before it can be shared.
485        let dbis = Arc::make_mut(&mut self.dbis);
486        dbis.extend(handles);
487
488        Ok(())
489    }
490
491    /// Creates all the tables defined in [`Tables`], if necessary.
492    ///
493    /// If this type is unique the created handle for the tables will be updated.
494    ///
495    /// This is recommended to be called during initialization to create and track additional tables
496    /// after the default [`Self::create_tables`] are created.
497    pub fn create_tables_for<TS: TableSet>(self: &mut Arc<Self>) -> Result<(), DatabaseError> {
498        let handles = self._create_tables::<TS>()?;
499        if let Some(db) = Arc::get_mut(self) {
500            // Note: The db is unique and the dbis as well, and they can also be cloned.
501            let dbis = Arc::make_mut(&mut db.dbis);
502            dbis.extend(handles);
503        }
504        Ok(())
505    }
506
507    /// Creates the tables and returns the identifiers of the tables.
508    fn _create_tables<TS: TableSet>(
509        &self,
510    ) -> Result<Vec<(&'static str, ffi::MDBX_dbi)>, DatabaseError> {
511        let mut handles = Vec::new();
512        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
513
514        for table in TS::tables() {
515            let flags =
516                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
517
518            let db = tx
519                .create_db(Some(table.name()), flags)
520                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
521            handles.push((table.name(), db.dbi()));
522        }
523
524        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
525        Ok(handles)
526    }
527
528    /// Records version that accesses the database with write privileges.
529    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
530        if version.is_empty() {
531            return Ok(())
532        }
533
534        let tx = self.tx_mut()?;
535        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
536
537        let last_version = version_cursor.last()?.map(|(_, v)| v);
538        if Some(&version) != last_version.as_ref() {
539            version_cursor.upsert(
540                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
541                &version,
542            )?;
543            tx.commit()?;
544        }
545
546        Ok(())
547    }
548}
549
550impl Deref for DatabaseEnv {
551    type Target = Environment;
552
553    fn deref(&self) -> &Self::Target {
554        &self.inner
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561    use crate::{
562        tables::{
563            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
564        },
565        test_utils::*,
566        AccountChangeSets,
567    };
568    use alloy_consensus::Header;
569    use alloy_primitives::{address, Address, B256, U256};
570    use reth_db_api::{
571        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
572        models::{AccountBeforeTx, IntegerList, ShardedKey},
573        table::{Encode, Table},
574    };
575    use reth_libmdbx::Error;
576    use reth_primitives_traits::{Account, StorageEntry};
577    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
578    use std::str::FromStr;
579    use tempfile::TempDir;
580
581    /// Create database for testing
582    fn create_test_db(kind: DatabaseEnvKind) -> Arc<DatabaseEnv> {
583        Arc::new(create_test_db_with_path(
584            kind,
585            &tempfile::TempDir::new().expect(ERROR_TEMPDIR).keep(),
586        ))
587    }
588
589    /// Create database for testing with specified path
590    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
591        let mut env =
592            DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
593                .expect(ERROR_DB_CREATION);
594        env.create_tables().expect(ERROR_TABLE_CREATION);
595        env
596    }
597
598    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
599    const ERROR_PUT: &str = "Not able to insert value into table.";
600    const ERROR_APPEND: &str = "Not able to append the value to the table.";
601    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
602    const ERROR_GET: &str = "Not able to get value from table.";
603    const ERROR_DEL: &str = "Not able to delete from table.";
604    const ERROR_COMMIT: &str = "Not able to commit transaction.";
605    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
606    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
607    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
608
609    #[test]
610    fn db_creation() {
611        create_test_db(DatabaseEnvKind::RW);
612    }
613
614    #[test]
615    fn db_manual_put_get() {
616        let env = create_test_db(DatabaseEnvKind::RW);
617
618        let value = Header::default();
619        let key = 1u64;
620
621        // PUT
622        let tx = env.tx_mut().expect(ERROR_INIT_TX);
623        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
624        tx.commit().expect(ERROR_COMMIT);
625
626        // GET
627        let tx = env.tx().expect(ERROR_INIT_TX);
628        let result = tx.get::<Headers>(key).expect(ERROR_GET);
629        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
630        tx.commit().expect(ERROR_COMMIT);
631    }
632
633    #[test]
634    fn db_dup_cursor_delete_first() {
635        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
636        let tx = db.tx_mut().expect(ERROR_INIT_TX);
637
638        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
639
640        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
641        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
642
643        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
644        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
645
646        assert_eq!(
647            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>(),
648            Ok(vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),])
649        );
650
651        let mut walker = dup_cursor.walk(None).unwrap();
652        walker.delete_current().expect(ERROR_DEL);
653
654        assert_eq!(walker.next(), Some(Ok((Address::with_last_byte(1), entry_1))));
655
656        // Check the tx view - it correctly holds entry_1
657        assert_eq!(
658            tx.cursor_dup_read::<PlainStorageState>()
659                .unwrap()
660                .walk(None)
661                .unwrap()
662                .collect::<Result<Vec<_>, _>>(),
663            Ok(vec![
664                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
665            ])
666        );
667
668        // Check the remainder of walker
669        assert_eq!(walker.next(), None);
670    }
671
672    #[test]
673    fn db_cursor_walk() {
674        let env = create_test_db(DatabaseEnvKind::RW);
675
676        let value = Header::default();
677        let key = 1u64;
678
679        // PUT
680        let tx = env.tx_mut().expect(ERROR_INIT_TX);
681        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
682        tx.commit().expect(ERROR_COMMIT);
683
684        // Cursor
685        let tx = env.tx().expect(ERROR_INIT_TX);
686        let mut cursor = tx.cursor_read::<Headers>().unwrap();
687
688        let first = cursor.first().unwrap();
689        assert!(first.is_some(), "First should be our put");
690
691        // Walk
692        let walk = cursor.walk(Some(key)).unwrap();
693        let first = walk.into_iter().next().unwrap().unwrap();
694        assert_eq!(first.1, value, "First next should be put value");
695    }
696
697    #[test]
698    fn db_cursor_walk_range() {
699        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
700
701        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
702        let tx = db.tx_mut().expect(ERROR_INIT_TX);
703        vec![0, 1, 2, 3]
704            .into_iter()
705            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
706            .expect(ERROR_PUT);
707        tx.commit().expect(ERROR_COMMIT);
708
709        let tx = db.tx().expect(ERROR_INIT_TX);
710        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
711
712        // [1, 3)
713        let mut walker = cursor.walk_range(1..3).unwrap();
714        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
715        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
716        assert_eq!(walker.next(), None);
717        // next() returns None after walker is done
718        assert_eq!(walker.next(), None);
719
720        // [1, 2]
721        let mut walker = cursor.walk_range(1..=2).unwrap();
722        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
723        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
724        // next() returns None after walker is done
725        assert_eq!(walker.next(), None);
726
727        // [1, ∞)
728        let mut walker = cursor.walk_range(1..).unwrap();
729        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
730        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
731        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
732        // next() returns None after walker is done
733        assert_eq!(walker.next(), None);
734
735        // [2, 4)
736        let mut walker = cursor.walk_range(2..4).unwrap();
737        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
738        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
739        assert_eq!(walker.next(), None);
740        // next() returns None after walker is done
741        assert_eq!(walker.next(), None);
742
743        // (∞, 3)
744        let mut walker = cursor.walk_range(..3).unwrap();
745        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
746        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
747        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
748        // next() returns None after walker is done
749        assert_eq!(walker.next(), None);
750
751        // (∞, ∞)
752        let mut walker = cursor.walk_range(..).unwrap();
753        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
754        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
755        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
756        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
757        // next() returns None after walker is done
758        assert_eq!(walker.next(), None);
759    }
760
761    #[test]
762    fn db_cursor_walk_range_on_dup_table() {
763        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
764
765        let address0 = Address::ZERO;
766        let address1 = Address::with_last_byte(1);
767        let address2 = Address::with_last_byte(2);
768
769        let tx = db.tx_mut().expect(ERROR_INIT_TX);
770        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
771            .expect(ERROR_PUT);
772        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
773            .expect(ERROR_PUT);
774        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
775            .expect(ERROR_PUT);
776        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
777            .expect(ERROR_PUT);
778        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
779            .expect(ERROR_PUT);
780        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
781            .expect(ERROR_PUT);
782        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
783            .expect(ERROR_PUT);
784        tx.commit().expect(ERROR_COMMIT);
785
786        let tx = db.tx().expect(ERROR_INIT_TX);
787        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
788
789        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
790        assert_eq!(entries.len(), 7);
791
792        let mut walker = cursor.walk_range(0..=1).unwrap();
793        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address0, info: None }))));
794        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address1, info: None }))));
795        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address2, info: None }))));
796        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address0, info: None }))));
797        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address1, info: None }))));
798        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address2, info: None }))));
799        assert_eq!(walker.next(), None);
800    }
801
802    #[expect(clippy::reversed_empty_ranges)]
803    #[test]
804    fn db_cursor_walk_range_invalid() {
805        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
806
807        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
808        let tx = db.tx_mut().expect(ERROR_INIT_TX);
809        vec![0, 1, 2, 3]
810            .into_iter()
811            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
812            .expect(ERROR_PUT);
813        tx.commit().expect(ERROR_COMMIT);
814
815        let tx = db.tx().expect(ERROR_INIT_TX);
816        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
817
818        // start bound greater than end bound
819        let mut res = cursor.walk_range(3..1).unwrap();
820        assert_eq!(res.next(), None);
821
822        // start bound greater than end bound
823        let mut res = cursor.walk_range(15..=2).unwrap();
824        assert_eq!(res.next(), None);
825
826        // returning nothing
827        let mut walker = cursor.walk_range(1..1).unwrap();
828        assert_eq!(walker.next(), None);
829    }
830
831    #[test]
832    fn db_walker() {
833        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
834
835        // PUT (0, 0), (1, 0), (3, 0)
836        let tx = db.tx_mut().expect(ERROR_INIT_TX);
837        vec![0, 1, 3]
838            .into_iter()
839            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
840            .expect(ERROR_PUT);
841        tx.commit().expect(ERROR_COMMIT);
842
843        let tx = db.tx().expect(ERROR_INIT_TX);
844        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
845
846        let mut walker = Walker::new(&mut cursor, None);
847
848        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
849        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
850        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
851        assert_eq!(walker.next(), None);
852
853        // transform to ReverseWalker
854        let mut reverse_walker = walker.rev();
855        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
856        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
857        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
858        assert_eq!(reverse_walker.next(), None);
859    }
860
861    #[test]
862    fn db_reverse_walker() {
863        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
864
865        // PUT (0, 0), (1, 0), (3, 0)
866        let tx = db.tx_mut().expect(ERROR_INIT_TX);
867        vec![0, 1, 3]
868            .into_iter()
869            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
870            .expect(ERROR_PUT);
871        tx.commit().expect(ERROR_COMMIT);
872
873        let tx = db.tx().expect(ERROR_INIT_TX);
874        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
875
876        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
877
878        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
879        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
880        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
881        assert_eq!(reverse_walker.next(), None);
882
883        // transform to Walker
884        let mut walker = reverse_walker.forward();
885        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
886        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
887        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
888        assert_eq!(walker.next(), None);
889    }
890
891    #[test]
892    fn db_walk_back() {
893        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
894
895        // PUT (0, 0), (1, 0), (3, 0)
896        let tx = db.tx_mut().expect(ERROR_INIT_TX);
897        vec![0, 1, 3]
898            .into_iter()
899            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
900            .expect(ERROR_PUT);
901        tx.commit().expect(ERROR_COMMIT);
902
903        let tx = db.tx().expect(ERROR_INIT_TX);
904        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
905
906        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
907        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
908        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
909        assert_eq!(reverse_walker.next(), None);
910
911        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
912        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
913        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
914        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
915        assert_eq!(reverse_walker.next(), None);
916
917        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
918        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
919        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
920        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
921        assert_eq!(reverse_walker.next(), None);
922
923        let mut reverse_walker = cursor.walk_back(None).unwrap();
924        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
925        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
926        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
927        assert_eq!(reverse_walker.next(), None);
928    }
929
930    #[test]
931    fn db_cursor_seek_exact_or_previous_key() {
932        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
933
934        // PUT
935        let tx = db.tx_mut().expect(ERROR_INIT_TX);
936        vec![0, 1, 3]
937            .into_iter()
938            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
939            .expect(ERROR_PUT);
940        tx.commit().expect(ERROR_COMMIT);
941
942        // Cursor
943        let missing_key = 2;
944        let tx = db.tx().expect(ERROR_INIT_TX);
945        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
946        assert_eq!(cursor.current(), Ok(None));
947
948        // Seek exact
949        let exact = cursor.seek_exact(missing_key).unwrap();
950        assert_eq!(exact, None);
951        assert_eq!(cursor.current(), Ok(None));
952    }
953
954    #[test]
955    fn db_cursor_insert() {
956        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
957
958        // PUT
959        let tx = db.tx_mut().expect(ERROR_INIT_TX);
960        vec![0, 1, 3, 4, 5]
961            .into_iter()
962            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
963            .expect(ERROR_PUT);
964        tx.commit().expect(ERROR_COMMIT);
965
966        let key_to_insert = 2;
967        let tx = db.tx_mut().expect(ERROR_INIT_TX);
968        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
969
970        // INSERT
971        assert_eq!(cursor.insert(key_to_insert, &B256::ZERO), Ok(()));
972        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
973
974        // INSERT (failure)
975        assert_eq!(
976            cursor.insert(key_to_insert, &B256::ZERO),
977            Err(DatabaseWriteError {
978                info: Error::KeyExist.into(),
979                operation: DatabaseWriteOperation::CursorInsert,
980                table_name: CanonicalHeaders::NAME,
981                key: key_to_insert.encode().into(),
982            }
983            .into())
984        );
985        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
986
987        tx.commit().expect(ERROR_COMMIT);
988
989        // Confirm the result
990        let tx = db.tx().expect(ERROR_INIT_TX);
991        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
992        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
993        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
994        tx.commit().expect(ERROR_COMMIT);
995    }
996
997    #[test]
998    fn db_cursor_insert_dup() {
999        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1000        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1001
1002        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1003        let key = Address::random();
1004        let subkey1 = B256::random();
1005        let subkey2 = B256::random();
1006
1007        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
1008        assert!(dup_cursor.insert(key, &entry1).is_ok());
1009
1010        // Can't insert
1011        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
1012        assert!(dup_cursor.insert(key, &entry2).is_err());
1013    }
1014
1015    #[test]
1016    fn db_cursor_delete_current_non_existent() {
1017        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1018        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1019
1020        let key1 = Address::with_last_byte(1);
1021        let key2 = Address::with_last_byte(2);
1022        let key3 = Address::with_last_byte(3);
1023        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1024
1025        assert!(cursor.insert(key1, &Account::default()).is_ok());
1026        assert!(cursor.insert(key2, &Account::default()).is_ok());
1027        assert!(cursor.insert(key3, &Account::default()).is_ok());
1028
1029        // Seek & delete key2
1030        cursor.seek_exact(key2).unwrap();
1031        assert_eq!(cursor.delete_current(), Ok(()));
1032        assert_eq!(cursor.seek_exact(key2), Ok(None));
1033
1034        // Seek & delete key2 again
1035        assert_eq!(cursor.seek_exact(key2), Ok(None));
1036        assert_eq!(
1037            cursor.delete_current(),
1038            Err(DatabaseError::Delete(reth_libmdbx::Error::NoData.into()))
1039        );
1040        // Assert that key1 is still there
1041        assert_eq!(cursor.seek_exact(key1), Ok(Some((key1, Account::default()))));
1042        // Assert that key3 is still there
1043        assert_eq!(cursor.seek_exact(key3), Ok(Some((key3, Account::default()))));
1044    }
1045
1046    #[test]
1047    fn db_cursor_insert_wherever_cursor_is() {
1048        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1049        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1050
1051        // PUT
1052        vec![0, 1, 3, 5, 7, 9]
1053            .into_iter()
1054            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1055            .expect(ERROR_PUT);
1056        tx.commit().expect(ERROR_COMMIT);
1057
1058        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1059        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1060
1061        // INSERT (cursor starts at last)
1062        cursor.last().unwrap();
1063        assert_eq!(cursor.current(), Ok(Some((9, B256::ZERO))));
1064
1065        for pos in (2..=8).step_by(2) {
1066            assert_eq!(cursor.insert(pos, &B256::ZERO), Ok(()));
1067            assert_eq!(cursor.current(), Ok(Some((pos, B256::ZERO))));
1068        }
1069        tx.commit().expect(ERROR_COMMIT);
1070
1071        // Confirm the result
1072        let tx = db.tx().expect(ERROR_INIT_TX);
1073        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1074        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1075        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1076        tx.commit().expect(ERROR_COMMIT);
1077    }
1078
1079    #[test]
1080    fn db_cursor_append() {
1081        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1082
1083        // PUT
1084        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1085        vec![0, 1, 2, 3, 4]
1086            .into_iter()
1087            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1088            .expect(ERROR_PUT);
1089        tx.commit().expect(ERROR_COMMIT);
1090
1091        // APPEND
1092        let key_to_append = 5;
1093        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1094        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1095        assert_eq!(cursor.append(key_to_append, &B256::ZERO), Ok(()));
1096        tx.commit().expect(ERROR_COMMIT);
1097
1098        // Confirm the result
1099        let tx = db.tx().expect(ERROR_INIT_TX);
1100        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1101        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1102        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1103        tx.commit().expect(ERROR_COMMIT);
1104    }
1105
1106    #[test]
1107    fn db_cursor_append_failure() {
1108        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1109
1110        // PUT
1111        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1112        vec![0, 1, 3, 4, 5]
1113            .into_iter()
1114            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1115            .expect(ERROR_PUT);
1116        tx.commit().expect(ERROR_COMMIT);
1117
1118        // APPEND
1119        let key_to_append = 2;
1120        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1121        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1122        assert_eq!(
1123            cursor.append(key_to_append, &B256::ZERO),
1124            Err(DatabaseWriteError {
1125                info: Error::KeyMismatch.into(),
1126                operation: DatabaseWriteOperation::CursorAppend,
1127                table_name: CanonicalHeaders::NAME,
1128                key: key_to_append.encode().into(),
1129            }
1130            .into())
1131        );
1132        assert_eq!(cursor.current(), Ok(Some((5, B256::ZERO)))); // the end of table
1133        tx.commit().expect(ERROR_COMMIT);
1134
1135        // Confirm the result
1136        let tx = db.tx().expect(ERROR_INIT_TX);
1137        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1138        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1139        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1140        tx.commit().expect(ERROR_COMMIT);
1141    }
1142
1143    #[test]
1144    fn db_cursor_upsert() {
1145        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1146        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1147
1148        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1149        let key = Address::random();
1150
1151        let account = Account::default();
1152        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1153        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1154
1155        let account = Account { nonce: 1, ..Default::default() };
1156        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1157        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1158
1159        let account = Account { nonce: 2, ..Default::default() };
1160        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1161        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1162
1163        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1164        let subkey = B256::random();
1165
1166        let value = U256::from(1);
1167        let entry1 = StorageEntry { key: subkey, value };
1168        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1169        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1170
1171        let value = U256::from(2);
1172        let entry2 = StorageEntry { key: subkey, value };
1173        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1174        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1175        assert_eq!(dup_cursor.next_dup_val(), Ok(Some(entry2)));
1176    }
1177
1178    #[test]
1179    fn db_cursor_dupsort_append() {
1180        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1181
1182        let transition_id = 2;
1183
1184        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1185        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1186        vec![0, 1, 3, 4, 5]
1187            .into_iter()
1188            .try_for_each(|val| {
1189                cursor.append(
1190                    transition_id,
1191                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1192                )
1193            })
1194            .expect(ERROR_APPEND);
1195        tx.commit().expect(ERROR_COMMIT);
1196
1197        // APPEND DUP & APPEND
1198        let subkey_to_append = 2;
1199        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1200        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1201        assert_eq!(
1202            cursor.append_dup(
1203                transition_id,
1204                AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1205            ),
1206            Err(DatabaseWriteError {
1207                info: Error::KeyMismatch.into(),
1208                operation: DatabaseWriteOperation::CursorAppendDup,
1209                table_name: AccountChangeSets::NAME,
1210                key: transition_id.encode().into(),
1211            }
1212            .into())
1213        );
1214        assert_eq!(
1215            cursor.append(
1216                transition_id - 1,
1217                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1218            ),
1219            Err(DatabaseWriteError {
1220                info: Error::KeyMismatch.into(),
1221                operation: DatabaseWriteOperation::CursorAppend,
1222                table_name: AccountChangeSets::NAME,
1223                key: (transition_id - 1).encode().into(),
1224            }
1225            .into())
1226        );
1227        assert_eq!(
1228            cursor.append(
1229                transition_id,
1230                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1231            ),
1232            Ok(())
1233        );
1234    }
1235
1236    #[test]
1237    fn db_closure_put_get() {
1238        let path = TempDir::new().expect(ERROR_TEMPDIR).keep();
1239
1240        let value = Account {
1241            nonce: 18446744073709551615,
1242            bytecode_hash: Some(B256::random()),
1243            balance: U256::MAX,
1244        };
1245        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1246            .expect(ERROR_ETH_ADDRESS);
1247
1248        {
1249            let env = create_test_db_with_path(DatabaseEnvKind::RW, &path);
1250
1251            // PUT
1252            let result = env.update(|tx| {
1253                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1254                200
1255            });
1256            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1257        }
1258
1259        let env = DatabaseEnv::open(
1260            &path,
1261            DatabaseEnvKind::RO,
1262            DatabaseArguments::new(ClientVersion::default()),
1263        )
1264        .expect(ERROR_DB_CREATION);
1265
1266        // GET
1267        let result =
1268            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1269
1270        assert_eq!(result, Some(value))
1271    }
1272
1273    #[test]
1274    fn db_dup_sort() {
1275        let env = create_test_db(DatabaseEnvKind::RW);
1276        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1277            .expect(ERROR_ETH_ADDRESS);
1278
1279        // PUT (0,0)
1280        let value00 = StorageEntry::default();
1281        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1282
1283        // PUT (2,2)
1284        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1285        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1286
1287        // PUT (1,1)
1288        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1289        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1290
1291        // Iterate with cursor
1292        {
1293            let tx = env.tx().expect(ERROR_INIT_TX);
1294            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1295
1296            // Notice that value11 and value22 have been ordered in the DB.
1297            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1298            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1299            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1300        }
1301
1302        // Seek value with exact subkey
1303        {
1304            let tx = env.tx().expect(ERROR_INIT_TX);
1305            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1306            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1307            assert_eq!(
1308                (key, value11),
1309                walker
1310                    .next()
1311                    .expect("element should exist.")
1312                    .expect("should be able to retrieve it.")
1313            );
1314        }
1315    }
1316
1317    #[test]
1318    fn db_walk_dup_with_not_existing_key() {
1319        let env = create_test_db(DatabaseEnvKind::RW);
1320        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1321            .expect(ERROR_ETH_ADDRESS);
1322
1323        // PUT (0,0)
1324        let value00 = StorageEntry::default();
1325        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1326
1327        // PUT (2,2)
1328        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1329        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1330
1331        // PUT (1,1)
1332        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1333        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1334
1335        // Try to walk_dup with not existing key should immediately return None
1336        {
1337            let tx = env.tx().expect(ERROR_INIT_TX);
1338            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1339            let not_existing_key = Address::ZERO;
1340            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1341            assert_eq!(walker.next(), None);
1342        }
1343    }
1344
1345    #[test]
1346    fn db_iterate_over_all_dup_values() {
1347        let env = create_test_db(DatabaseEnvKind::RW);
1348        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1349            .expect(ERROR_ETH_ADDRESS);
1350        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1351            .expect(ERROR_ETH_ADDRESS);
1352
1353        // PUT key1 (0,0)
1354        let value00 = StorageEntry::default();
1355        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1356
1357        // PUT key1 (1,1)
1358        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1359        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1360
1361        // PUT key2 (2,2)
1362        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1363        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1364
1365        // Iterate with walk_dup
1366        {
1367            let tx = env.tx().expect(ERROR_INIT_TX);
1368            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1369            let mut walker = cursor.walk_dup(None, None).unwrap();
1370
1371            // Notice that value11 and value22 have been ordered in the DB.
1372            assert_eq!(Some(Ok((key1, value00))), walker.next());
1373            assert_eq!(Some(Ok((key1, value11))), walker.next());
1374            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1375            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1376            assert_eq!(None, walker.next());
1377        }
1378
1379        // Iterate by using `walk`
1380        {
1381            let tx = env.tx().expect(ERROR_INIT_TX);
1382            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1383            let first = cursor.first().unwrap().unwrap();
1384            let mut walker = cursor.walk(Some(first.0)).unwrap();
1385            assert_eq!(Some(Ok((key1, value00))), walker.next());
1386            assert_eq!(Some(Ok((key1, value11))), walker.next());
1387            assert_eq!(Some(Ok((key2, value22))), walker.next());
1388        }
1389    }
1390
1391    #[test]
1392    fn dup_value_with_same_subkey() {
1393        let env = create_test_db(DatabaseEnvKind::RW);
1394        let key1 = Address::new([0x11; 20]);
1395        let key2 = Address::new([0x22; 20]);
1396
1397        // PUT key1 (0,1)
1398        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1399        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1400
1401        // PUT key1 (0,0)
1402        let value00 = StorageEntry::default();
1403        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1404
1405        // PUT key2 (2,2)
1406        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1407        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1408
1409        // Iterate with walk
1410        {
1411            let tx = env.tx().expect(ERROR_INIT_TX);
1412            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1413            let first = cursor.first().unwrap().unwrap();
1414            let mut walker = cursor.walk(Some(first.0)).unwrap();
1415
1416            // NOTE: Both values are present
1417            assert_eq!(Some(Ok((key1, value00))), walker.next());
1418            assert_eq!(Some(Ok((key1, value01))), walker.next());
1419            assert_eq!(Some(Ok((key2, value22))), walker.next());
1420        }
1421
1422        // seek_by_key_subkey
1423        {
1424            let tx = env.tx().expect(ERROR_INIT_TX);
1425            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1426
1427            // NOTE: There are two values with same SubKey but only first one is shown
1428            assert_eq!(Ok(Some(value00)), cursor.seek_by_key_subkey(key1, value00.key));
1429            // key1 but value is greater than the one in the DB
1430            assert_eq!(Ok(None), cursor.seek_by_key_subkey(key1, value22.key));
1431        }
1432    }
1433
1434    #[test]
1435    fn db_sharded_key() {
1436        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1437        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1438
1439        let shards = 5;
1440        for i in 1..=shards {
1441            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1442            let list = IntegerList::new_pre_sorted([i * 100u64]);
1443
1444            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1445                .unwrap();
1446        }
1447
1448        // Seek value with non existing key.
1449        {
1450            let tx = db.tx().expect(ERROR_INIT_TX);
1451            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1452
1453            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1454            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1455            // `Address | 200`.
1456            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1457            let (key, list) = walker
1458                .next()
1459                .expect("element should exist.")
1460                .expect("should be able to retrieve it.");
1461
1462            assert_eq!(ShardedKey::new(real_key, 200), key);
1463            let list200 = IntegerList::new_pre_sorted([200u64]);
1464            assert_eq!(list200, list);
1465        }
1466        // Seek greatest index
1467        {
1468            let tx = db.tx().expect(ERROR_INIT_TX);
1469            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1470
1471            // It will seek the MAX value of transition index and try to use prev to get first
1472            // biggers.
1473            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1474            let (key, list) = cursor
1475                .prev()
1476                .expect("element should exist.")
1477                .expect("should be able to retrieve it.");
1478
1479            assert_eq!(ShardedKey::new(real_key, 400), key);
1480            let list400 = IntegerList::new_pre_sorted([400u64]);
1481            assert_eq!(list400, list);
1482        }
1483    }
1484}