reth_db/implementation/mdbx/
mod.rs

1//! Module that interacts with MDBX.
2
3use crate::{
4    lockfile::StorageLock,
5    metrics::DatabaseEnvMetrics,
6    tables::{self, Tables},
7    utils::default_page_size,
8    DatabaseError, TableSet,
9};
10use eyre::Context;
11use metrics::{gauge, Label};
12use reth_db_api::{
13    cursor::{DbCursorRO, DbCursorRW},
14    database::Database,
15    database_metrics::DatabaseMetrics,
16    models::ClientVersion,
17    transaction::{DbTx, DbTxMut},
18};
19use reth_libmdbx::{
20    ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
21    MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
22};
23use reth_storage_errors::db::LogLevel;
24use reth_tracing::tracing::error;
25use std::{
26    ops::{Deref, Range},
27    path::Path,
28    sync::Arc,
29    time::{SystemTime, UNIX_EPOCH},
30};
31use tx::Tx;
32
33pub mod cursor;
34pub mod tx;
35
36mod utils;
37
38/// 1 KB in bytes
39pub const KILOBYTE: usize = 1024;
40/// 1 MB in bytes
41pub const MEGABYTE: usize = KILOBYTE * 1024;
42/// 1 GB in bytes
43pub const GIGABYTE: usize = MEGABYTE * 1024;
44/// 1 TB in bytes
45pub const TERABYTE: usize = GIGABYTE * 1024;
46
47/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
48const DEFAULT_MAX_READERS: u64 = 32_000;
49
50/// Space that a read-only transaction can occupy until the warning is emitted.
51/// See [`reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers`] for more information.
52const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
53
54/// Environment used when opening a MDBX environment. RO/RW.
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub enum DatabaseEnvKind {
57    /// Read-only MDBX environment.
58    RO,
59    /// Read-write MDBX environment.
60    RW,
61}
62
63impl DatabaseEnvKind {
64    /// Returns `true` if the environment is read-write.
65    pub const fn is_rw(&self) -> bool {
66        matches!(self, Self::RW)
67    }
68}
69
70/// Arguments for database initialization.
71#[derive(Clone, Debug)]
72pub struct DatabaseArguments {
73    /// Client version that accesses the database.
74    client_version: ClientVersion,
75    /// Database geometry settings.
76    geometry: Geometry<Range<usize>>,
77    /// Database log level. If [None], the default value is used.
78    log_level: Option<LogLevel>,
79    /// Maximum duration of a read transaction. If [None], the default value is used.
80    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
81    /// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
82    ///
83    /// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
84    /// way, you can get the minimal overhead, but with the correct multi-process and multi-thread
85    /// locking.
86    ///
87    /// If `true` = open environment in exclusive/monopolistic mode or return `MDBX_BUSY` if
88    /// environment already used by other process. The main feature of the exclusive mode is the
89    /// ability to open the environment placed on a network share.
90    ///
91    /// If `false` = open environment in cooperative mode, i.e. for multi-process
92    /// access/interaction/cooperation. The main requirements of the cooperative mode are:
93    /// - Data files MUST be placed in the LOCAL file system, but NOT on a network share.
94    /// - Environment MUST be opened only by LOCAL processes, but NOT over a network.
95    /// - OS kernel (i.e. file system and memory mapping implementation) and all processes that
96    ///   open the given environment MUST be running in the physically single RAM with
97    ///   cache-coherency. The only exception for cache-consistency requirement is Linux on MIPS
98    ///   architecture, but this case has not been tested for a long time).
99    ///
100    /// This flag affects only at environment opening but can't be changed after.
101    exclusive: Option<bool>,
102    /// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`). This arg is to configure the max
103    /// readers.
104    max_readers: Option<u64>,
105}
106
107impl Default for DatabaseArguments {
108    fn default() -> Self {
109        Self::new(ClientVersion::default())
110    }
111}
112
113impl DatabaseArguments {
114    /// Create new database arguments with given client version.
115    pub fn new(client_version: ClientVersion) -> Self {
116        Self {
117            client_version,
118            geometry: Geometry {
119                size: Some(0..(4 * TERABYTE)),
120                growth_step: Some(4 * GIGABYTE as isize),
121                shrink_threshold: Some(0),
122                page_size: Some(PageSize::Set(default_page_size())),
123            },
124            log_level: None,
125            max_read_transaction_duration: None,
126            exclusive: None,
127            max_readers: None,
128        }
129    }
130
131    /// Sets the upper size limit of the db environment, the maximum database size in bytes.
132    pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
133        if let Some(max_size) = max_size {
134            self.geometry.size = Some(0..max_size);
135        }
136        self
137    }
138
139    /// Configures the database growth step in bytes.
140    pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
141        if let Some(growth_step) = growth_step {
142            self.geometry.growth_step = Some(growth_step as isize);
143        }
144        self
145    }
146
147    /// Set the log level.
148    pub const fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
149        self.log_level = log_level;
150        self
151    }
152
153    /// Set the maximum duration of a read transaction.
154    pub const fn max_read_transaction_duration(
155        &mut self,
156        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
157    ) {
158        self.max_read_transaction_duration = max_read_transaction_duration;
159    }
160
161    /// Set the maximum duration of a read transaction.
162    pub const fn with_max_read_transaction_duration(
163        mut self,
164        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
165    ) -> Self {
166        self.max_read_transaction_duration(max_read_transaction_duration);
167        self
168    }
169
170    /// Set the mdbx exclusive flag.
171    pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
172        self.exclusive = exclusive;
173        self
174    }
175
176    /// Set `max_readers` flag.
177    pub const fn with_max_readers(mut self, max_readers: Option<u64>) -> Self {
178        self.max_readers = max_readers;
179        self
180    }
181
182    /// Returns the client version if any.
183    pub const fn client_version(&self) -> &ClientVersion {
184        &self.client_version
185    }
186}
187
188/// Wrapper for the libmdbx environment: [Environment]
189#[derive(Debug)]
190pub struct DatabaseEnv {
191    /// Libmdbx-sys environment.
192    inner: Environment,
193    /// Cache for metric handles. If `None`, metrics are not recorded.
194    metrics: Option<Arc<DatabaseEnvMetrics>>,
195    /// Write lock for when dealing with a read-write environment.
196    _lock_file: Option<StorageLock>,
197}
198
199impl Database for DatabaseEnv {
200    type TX = tx::Tx<RO>;
201    type TXMut = tx::Tx<RW>;
202
203    fn tx(&self) -> Result<Self::TX, DatabaseError> {
204        Tx::new_with_metrics(
205            self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
206            self.metrics.clone(),
207        )
208        .map_err(|e| DatabaseError::InitTx(e.into()))
209    }
210
211    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
212        Tx::new_with_metrics(
213            self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
214            self.metrics.clone(),
215        )
216        .map_err(|e| DatabaseError::InitTx(e.into()))
217    }
218}
219
220impl DatabaseMetrics for DatabaseEnv {
221    fn report_metrics(&self) {
222        for (name, value, labels) in self.gauge_metrics() {
223            gauge!(name, labels).set(value);
224        }
225    }
226
227    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
228        let mut metrics = Vec::new();
229
230        let _ = self
231            .view(|tx| {
232                for table in Tables::ALL.iter().map(Tables::name) {
233                    let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
234
235                    let stats = tx
236                        .inner
237                        .db_stat(&table_db)
238                        .wrap_err(format!("Could not find table: {table}"))?;
239
240                    let page_size = stats.page_size() as usize;
241                    let leaf_pages = stats.leaf_pages();
242                    let branch_pages = stats.branch_pages();
243                    let overflow_pages = stats.overflow_pages();
244                    let num_pages = leaf_pages + branch_pages + overflow_pages;
245                    let table_size = page_size * num_pages;
246                    let entries = stats.entries();
247
248                    metrics.push((
249                        "db.table_size",
250                        table_size as f64,
251                        vec![Label::new("table", table)],
252                    ));
253                    metrics.push((
254                        "db.table_pages",
255                        leaf_pages as f64,
256                        vec![Label::new("table", table), Label::new("type", "leaf")],
257                    ));
258                    metrics.push((
259                        "db.table_pages",
260                        branch_pages as f64,
261                        vec![Label::new("table", table), Label::new("type", "branch")],
262                    ));
263                    metrics.push((
264                        "db.table_pages",
265                        overflow_pages as f64,
266                        vec![Label::new("table", table), Label::new("type", "overflow")],
267                    ));
268                    metrics.push((
269                        "db.table_entries",
270                        entries as f64,
271                        vec![Label::new("table", table)],
272                    ));
273                }
274
275                Ok::<(), eyre::Report>(())
276            })
277            .map_err(|error| error!(%error, "Failed to read db table stats"));
278
279        if let Ok(freelist) =
280            self.freelist().map_err(|error| error!(%error, "Failed to read db.freelist"))
281        {
282            metrics.push(("db.freelist", freelist as f64, vec![]));
283        }
284
285        if let Ok(stat) = self.stat().map_err(|error| error!(%error, "Failed to read db.stat")) {
286            metrics.push(("db.page_size", stat.page_size() as f64, vec![]));
287        }
288
289        metrics.push((
290            "db.timed_out_not_aborted_transactions",
291            self.timed_out_not_aborted_transactions() as f64,
292            vec![],
293        ));
294
295        metrics
296    }
297}
298
299impl DatabaseEnv {
300    /// Opens the database at the specified path with the given `EnvKind`.
301    ///
302    /// It does not create the tables, for that call [`DatabaseEnv::create_tables`].
303    pub fn open(
304        path: &Path,
305        kind: DatabaseEnvKind,
306        args: DatabaseArguments,
307    ) -> Result<Self, DatabaseError> {
308        let _lock_file = if kind.is_rw() {
309            StorageLock::try_acquire(path)
310                .map_err(|err| DatabaseError::Other(err.to_string()))?
311                .into()
312        } else {
313            None
314        };
315
316        let mut inner_env = Environment::builder();
317
318        let mode = match kind {
319            DatabaseEnvKind::RO => Mode::ReadOnly,
320            DatabaseEnvKind::RW => {
321                // enable writemap mode in RW mode
322                inner_env.write_map();
323                Mode::ReadWrite { sync_mode: SyncMode::Durable }
324            }
325        };
326
327        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be set on
328        // environment creation.
329        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
330        inner_env.set_max_dbs(256);
331        inner_env.set_geometry(args.geometry);
332
333        fn is_current_process(id: u32) -> bool {
334            #[cfg(unix)]
335            {
336                id == std::os::unix::process::parent_id() || id == std::process::id()
337            }
338
339            #[cfg(not(unix))]
340            {
341                id == std::process::id()
342            }
343        }
344
345        extern "C" fn handle_slow_readers(
346            _env: *const ffi::MDBX_env,
347            _txn: *const ffi::MDBX_txn,
348            process_id: ffi::mdbx_pid_t,
349            thread_id: ffi::mdbx_tid_t,
350            read_txn_id: u64,
351            gap: std::ffi::c_uint,
352            space: usize,
353            retry: std::ffi::c_int,
354        ) -> HandleSlowReadersReturnCode {
355            if space > MAX_SAFE_READER_SPACE {
356                let message = if is_current_process(process_id as u32) {
357                    "Current process has a long-lived database transaction that grows the database file."
358                } else {
359                    "External process has a long-lived database transaction that grows the database file. \
360                     Use shorter-lived read transactions or shut down the node."
361                };
362                reth_tracing::tracing::warn!(
363                    target: "storage::db::mdbx",
364                    ?process_id,
365                    ?thread_id,
366                    ?read_txn_id,
367                    ?gap,
368                    ?space,
369                    ?retry,
370                    "{message}"
371                )
372            }
373
374            reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
375        }
376        inner_env.set_handle_slow_readers(handle_slow_readers);
377
378        inner_env.set_flags(EnvironmentFlags {
379            mode,
380            // We disable readahead because it improves performance for linear scans, but
381            // worsens it for random access (which is our access pattern outside of sync)
382            no_rdahead: true,
383            coalesce: true,
384            exclusive: args.exclusive.unwrap_or_default(),
385            ..Default::default()
386        });
387        // Configure more readers
388        inner_env.set_max_readers(args.max_readers.unwrap_or(DEFAULT_MAX_READERS));
389        // This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
390        // is "pages". Reclaimed list is the list of freed pages that's populated during the
391        // lifetime of DB transaction, and through which MDBX searches when it needs to insert new
392        // record with overflow pages. The flow is roughly the following:
393        // 0. We need to insert a record that requires N number of overflow pages (in consecutive
394        //    sequence inside the DB file).
395        // 1. Get some pages from the freelist, put them into the reclaimed list.
396        // 2. Search through the reclaimed list for the sequence of size N.
397        // 3. a. If found, return the sequence.
398        // 3. b. If not found, repeat steps 1-3. If the reclaimed list size is larger than
399        //    the `rp augment limit`, stop the search and allocate new pages at the end of the file:
400        //    https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L11479-L11480.
401        //
402        // Basically, this parameter controls for how long do we search through the freelist before
403        // trying to allocate new pages. Smaller value will make MDBX to fallback to
404        // allocation faster, higher value will force MDBX to search through the freelist
405        // longer until the sequence of pages is found.
406        //
407        // The default value of this parameter is set depending on the DB size. The bigger the
408        // database, the larger is `rp augment limit`.
409        // https://github.com/paradigmxyz/reth/blob/2a4c78759178f66e30c8976ec5d243b53102fc9a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L10018-L10024.
410        //
411        // Previously, MDBX set this value as `256 * 1024` constant. Let's fallback to this,
412        // because we want to prioritize freelist lookup speed over database growth.
413        // https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
414        inner_env.set_rp_augment_limit(256 * 1024);
415
416        if let Some(log_level) = args.log_level {
417            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG` option.
418            let is_log_level_available = if cfg!(debug_assertions) {
419                true
420            } else {
421                matches!(
422                    log_level,
423                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
424                )
425            };
426            if is_log_level_available {
427                inner_env.set_log_level(match log_level {
428                    LogLevel::Fatal => 0,
429                    LogLevel::Error => 1,
430                    LogLevel::Warn => 2,
431                    LogLevel::Notice => 3,
432                    LogLevel::Verbose => 4,
433                    LogLevel::Debug => 5,
434                    LogLevel::Trace => 6,
435                    LogLevel::Extra => 7,
436                });
437            } else {
438                return Err(DatabaseError::LogLevelUnavailable(log_level))
439            }
440        }
441
442        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
443            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
444        }
445
446        let env = Self {
447            inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
448            metrics: None,
449            _lock_file,
450        };
451
452        Ok(env)
453    }
454
455    /// Enables metrics on the database.
456    pub fn with_metrics(mut self) -> Self {
457        self.metrics = Some(DatabaseEnvMetrics::new().into());
458        self
459    }
460
461    /// Creates all the tables defined in [`Tables`], if necessary.
462    pub fn create_tables(&self) -> Result<(), DatabaseError> {
463        self.create_tables_for::<Tables>()
464    }
465
466    /// Creates all the tables defined in the given [`TableSet`], if necessary.
467    pub fn create_tables_for<TS: TableSet>(&self) -> Result<(), DatabaseError> {
468        let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
469
470        for table in TS::tables() {
471            let flags =
472                if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };
473
474            tx.create_db(Some(table.name()), flags)
475                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
476        }
477
478        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
479
480        Ok(())
481    }
482
483    /// Records version that accesses the database with write privileges.
484    pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
485        if version.is_empty() {
486            return Ok(())
487        }
488
489        let tx = self.tx_mut()?;
490        let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
491
492        let last_version = version_cursor.last()?.map(|(_, v)| v);
493        if Some(&version) != last_version.as_ref() {
494            version_cursor.upsert(
495                SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
496                &version,
497            )?;
498            tx.commit()?;
499        }
500
501        Ok(())
502    }
503}
504
505impl Deref for DatabaseEnv {
506    type Target = Environment;
507
508    fn deref(&self) -> &Self::Target {
509        &self.inner
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::{
517        tables::{
518            AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
519        },
520        test_utils::*,
521        AccountChangeSets,
522    };
523    use alloy_consensus::Header;
524    use alloy_primitives::{address, Address, B256, U256};
525    use reth_db_api::{
526        cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
527        models::{AccountBeforeTx, IntegerList, ShardedKey},
528        table::{Encode, Table},
529    };
530    use reth_libmdbx::Error;
531    use reth_primitives_traits::{Account, StorageEntry};
532    use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
533    use std::str::FromStr;
534    use tempfile::TempDir;
535
536    /// Create database for testing
537    fn create_test_db(kind: DatabaseEnvKind) -> Arc<DatabaseEnv> {
538        Arc::new(create_test_db_with_path(
539            kind,
540            &tempfile::TempDir::new().expect(ERROR_TEMPDIR).keep(),
541        ))
542    }
543
544    /// Create database for testing with specified path
545    fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
546        let env = DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
547            .expect(ERROR_DB_CREATION);
548        env.create_tables().expect(ERROR_TABLE_CREATION);
549        env
550    }
551
552    const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
553    const ERROR_PUT: &str = "Not able to insert value into table.";
554    const ERROR_APPEND: &str = "Not able to append the value to the table.";
555    const ERROR_UPSERT: &str = "Not able to upsert the value to the table.";
556    const ERROR_GET: &str = "Not able to get value from table.";
557    const ERROR_DEL: &str = "Not able to delete from table.";
558    const ERROR_COMMIT: &str = "Not able to commit transaction.";
559    const ERROR_RETURN_VALUE: &str = "Mismatching result.";
560    const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
561    const ERROR_ETH_ADDRESS: &str = "Invalid address.";
562
563    #[test]
564    fn db_creation() {
565        create_test_db(DatabaseEnvKind::RW);
566    }
567
568    #[test]
569    fn db_manual_put_get() {
570        let env = create_test_db(DatabaseEnvKind::RW);
571
572        let value = Header::default();
573        let key = 1u64;
574
575        // PUT
576        let tx = env.tx_mut().expect(ERROR_INIT_TX);
577        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
578        tx.commit().expect(ERROR_COMMIT);
579
580        // GET
581        let tx = env.tx().expect(ERROR_INIT_TX);
582        let result = tx.get::<Headers>(key).expect(ERROR_GET);
583        assert_eq!(result.expect(ERROR_RETURN_VALUE), value);
584        tx.commit().expect(ERROR_COMMIT);
585    }
586
587    #[test]
588    fn db_dup_cursor_delete_first() {
589        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
590        let tx = db.tx_mut().expect(ERROR_INIT_TX);
591
592        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
593
594        let entry_0 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(0) };
595        let entry_1 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
596
597        dup_cursor.upsert(Address::with_last_byte(1), &entry_0).expect(ERROR_UPSERT);
598        dup_cursor.upsert(Address::with_last_byte(1), &entry_1).expect(ERROR_UPSERT);
599
600        assert_eq!(
601            dup_cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>(),
602            Ok(vec![(Address::with_last_byte(1), entry_0), (Address::with_last_byte(1), entry_1),])
603        );
604
605        let mut walker = dup_cursor.walk(None).unwrap();
606        walker.delete_current().expect(ERROR_DEL);
607
608        assert_eq!(walker.next(), Some(Ok((Address::with_last_byte(1), entry_1))));
609
610        // Check the tx view - it correctly holds entry_1
611        assert_eq!(
612            tx.cursor_dup_read::<PlainStorageState>()
613                .unwrap()
614                .walk(None)
615                .unwrap()
616                .collect::<Result<Vec<_>, _>>(),
617            Ok(vec![
618                (Address::with_last_byte(1), entry_1), // This is ok - we removed entry_0
619            ])
620        );
621
622        // Check the remainder of walker
623        assert_eq!(walker.next(), None);
624    }
625
626    #[test]
627    fn db_cursor_walk() {
628        let env = create_test_db(DatabaseEnvKind::RW);
629
630        let value = Header::default();
631        let key = 1u64;
632
633        // PUT
634        let tx = env.tx_mut().expect(ERROR_INIT_TX);
635        tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
636        tx.commit().expect(ERROR_COMMIT);
637
638        // Cursor
639        let tx = env.tx().expect(ERROR_INIT_TX);
640        let mut cursor = tx.cursor_read::<Headers>().unwrap();
641
642        let first = cursor.first().unwrap();
643        assert!(first.is_some(), "First should be our put");
644
645        // Walk
646        let walk = cursor.walk(Some(key)).unwrap();
647        let first = walk.into_iter().next().unwrap().unwrap();
648        assert_eq!(first.1, value, "First next should be put value");
649    }
650
651    #[test]
652    fn db_cursor_walk_range() {
653        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
654
655        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
656        let tx = db.tx_mut().expect(ERROR_INIT_TX);
657        vec![0, 1, 2, 3]
658            .into_iter()
659            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
660            .expect(ERROR_PUT);
661        tx.commit().expect(ERROR_COMMIT);
662
663        let tx = db.tx().expect(ERROR_INIT_TX);
664        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
665
666        // [1, 3)
667        let mut walker = cursor.walk_range(1..3).unwrap();
668        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
669        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
670        assert_eq!(walker.next(), None);
671        // next() returns None after walker is done
672        assert_eq!(walker.next(), None);
673
674        // [1, 2]
675        let mut walker = cursor.walk_range(1..=2).unwrap();
676        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
677        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
678        // next() returns None after walker is done
679        assert_eq!(walker.next(), None);
680
681        // [1, ∞)
682        let mut walker = cursor.walk_range(1..).unwrap();
683        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
684        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
685        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
686        // next() returns None after walker is done
687        assert_eq!(walker.next(), None);
688
689        // [2, 4)
690        let mut walker = cursor.walk_range(2..4).unwrap();
691        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
692        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
693        assert_eq!(walker.next(), None);
694        // next() returns None after walker is done
695        assert_eq!(walker.next(), None);
696
697        // (∞, 3)
698        let mut walker = cursor.walk_range(..3).unwrap();
699        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
700        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
701        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
702        // next() returns None after walker is done
703        assert_eq!(walker.next(), None);
704
705        // (∞, ∞)
706        let mut walker = cursor.walk_range(..).unwrap();
707        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
708        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
709        assert_eq!(walker.next(), Some(Ok((2, B256::ZERO))));
710        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
711        // next() returns None after walker is done
712        assert_eq!(walker.next(), None);
713    }
714
715    #[test]
716    fn db_cursor_walk_range_on_dup_table() {
717        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
718
719        let address0 = Address::ZERO;
720        let address1 = Address::with_last_byte(1);
721        let address2 = Address::with_last_byte(2);
722
723        let tx = db.tx_mut().expect(ERROR_INIT_TX);
724        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address0, info: None })
725            .expect(ERROR_PUT);
726        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address1, info: None })
727            .expect(ERROR_PUT);
728        tx.put::<AccountChangeSets>(0, AccountBeforeTx { address: address2, info: None })
729            .expect(ERROR_PUT);
730        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address0, info: None })
731            .expect(ERROR_PUT);
732        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address1, info: None })
733            .expect(ERROR_PUT);
734        tx.put::<AccountChangeSets>(1, AccountBeforeTx { address: address2, info: None })
735            .expect(ERROR_PUT);
736        tx.put::<AccountChangeSets>(2, AccountBeforeTx { address: address0, info: None }) // <- should not be returned by the walker
737            .expect(ERROR_PUT);
738        tx.commit().expect(ERROR_COMMIT);
739
740        let tx = db.tx().expect(ERROR_INIT_TX);
741        let mut cursor = tx.cursor_read::<AccountChangeSets>().unwrap();
742
743        let entries = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
744        assert_eq!(entries.len(), 7);
745
746        let mut walker = cursor.walk_range(0..=1).unwrap();
747        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address0, info: None }))));
748        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address1, info: None }))));
749        assert_eq!(walker.next(), Some(Ok((0, AccountBeforeTx { address: address2, info: None }))));
750        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address0, info: None }))));
751        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address1, info: None }))));
752        assert_eq!(walker.next(), Some(Ok((1, AccountBeforeTx { address: address2, info: None }))));
753        assert_eq!(walker.next(), None);
754    }
755
756    #[expect(clippy::reversed_empty_ranges)]
757    #[test]
758    fn db_cursor_walk_range_invalid() {
759        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
760
761        // PUT (0, 0), (1, 0), (2, 0), (3, 0)
762        let tx = db.tx_mut().expect(ERROR_INIT_TX);
763        vec![0, 1, 2, 3]
764            .into_iter()
765            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
766            .expect(ERROR_PUT);
767        tx.commit().expect(ERROR_COMMIT);
768
769        let tx = db.tx().expect(ERROR_INIT_TX);
770        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
771
772        // start bound greater than end bound
773        let mut res = cursor.walk_range(3..1).unwrap();
774        assert_eq!(res.next(), None);
775
776        // start bound greater than end bound
777        let mut res = cursor.walk_range(15..=2).unwrap();
778        assert_eq!(res.next(), None);
779
780        // returning nothing
781        let mut walker = cursor.walk_range(1..1).unwrap();
782        assert_eq!(walker.next(), None);
783    }
784
785    #[test]
786    fn db_walker() {
787        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
788
789        // PUT (0, 0), (1, 0), (3, 0)
790        let tx = db.tx_mut().expect(ERROR_INIT_TX);
791        vec![0, 1, 3]
792            .into_iter()
793            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
794            .expect(ERROR_PUT);
795        tx.commit().expect(ERROR_COMMIT);
796
797        let tx = db.tx().expect(ERROR_INIT_TX);
798        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
799
800        let mut walker = Walker::new(&mut cursor, None);
801
802        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
803        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
804        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
805        assert_eq!(walker.next(), None);
806
807        // transform to ReverseWalker
808        let mut reverse_walker = walker.rev();
809        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
810        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
811        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
812        assert_eq!(reverse_walker.next(), None);
813    }
814
815    #[test]
816    fn db_reverse_walker() {
817        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
818
819        // PUT (0, 0), (1, 0), (3, 0)
820        let tx = db.tx_mut().expect(ERROR_INIT_TX);
821        vec![0, 1, 3]
822            .into_iter()
823            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
824            .expect(ERROR_PUT);
825        tx.commit().expect(ERROR_COMMIT);
826
827        let tx = db.tx().expect(ERROR_INIT_TX);
828        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
829
830        let mut reverse_walker = ReverseWalker::new(&mut cursor, None);
831
832        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
833        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
834        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
835        assert_eq!(reverse_walker.next(), None);
836
837        // transform to Walker
838        let mut walker = reverse_walker.forward();
839        assert_eq!(walker.next(), Some(Ok((0, B256::ZERO))));
840        assert_eq!(walker.next(), Some(Ok((1, B256::ZERO))));
841        assert_eq!(walker.next(), Some(Ok((3, B256::ZERO))));
842        assert_eq!(walker.next(), None);
843    }
844
845    #[test]
846    fn db_walk_back() {
847        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
848
849        // PUT (0, 0), (1, 0), (3, 0)
850        let tx = db.tx_mut().expect(ERROR_INIT_TX);
851        vec![0, 1, 3]
852            .into_iter()
853            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
854            .expect(ERROR_PUT);
855        tx.commit().expect(ERROR_COMMIT);
856
857        let tx = db.tx().expect(ERROR_INIT_TX);
858        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
859
860        let mut reverse_walker = cursor.walk_back(Some(1)).unwrap();
861        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
862        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
863        assert_eq!(reverse_walker.next(), None);
864
865        let mut reverse_walker = cursor.walk_back(Some(2)).unwrap();
866        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
867        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
868        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
869        assert_eq!(reverse_walker.next(), None);
870
871        let mut reverse_walker = cursor.walk_back(Some(4)).unwrap();
872        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
873        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
874        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
875        assert_eq!(reverse_walker.next(), None);
876
877        let mut reverse_walker = cursor.walk_back(None).unwrap();
878        assert_eq!(reverse_walker.next(), Some(Ok((3, B256::ZERO))));
879        assert_eq!(reverse_walker.next(), Some(Ok((1, B256::ZERO))));
880        assert_eq!(reverse_walker.next(), Some(Ok((0, B256::ZERO))));
881        assert_eq!(reverse_walker.next(), None);
882    }
883
884    #[test]
885    fn db_cursor_seek_exact_or_previous_key() {
886        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
887
888        // PUT
889        let tx = db.tx_mut().expect(ERROR_INIT_TX);
890        vec![0, 1, 3]
891            .into_iter()
892            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
893            .expect(ERROR_PUT);
894        tx.commit().expect(ERROR_COMMIT);
895
896        // Cursor
897        let missing_key = 2;
898        let tx = db.tx().expect(ERROR_INIT_TX);
899        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
900        assert_eq!(cursor.current(), Ok(None));
901
902        // Seek exact
903        let exact = cursor.seek_exact(missing_key).unwrap();
904        assert_eq!(exact, None);
905        assert_eq!(cursor.current(), Ok(None));
906    }
907
908    #[test]
909    fn db_cursor_insert() {
910        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
911
912        // PUT
913        let tx = db.tx_mut().expect(ERROR_INIT_TX);
914        vec![0, 1, 3, 4, 5]
915            .into_iter()
916            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
917            .expect(ERROR_PUT);
918        tx.commit().expect(ERROR_COMMIT);
919
920        let key_to_insert = 2;
921        let tx = db.tx_mut().expect(ERROR_INIT_TX);
922        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
923
924        // INSERT
925        assert_eq!(cursor.insert(key_to_insert, &B256::ZERO), Ok(()));
926        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
927
928        // INSERT (failure)
929        assert_eq!(
930            cursor.insert(key_to_insert, &B256::ZERO),
931            Err(DatabaseWriteError {
932                info: Error::KeyExist.into(),
933                operation: DatabaseWriteOperation::CursorInsert,
934                table_name: CanonicalHeaders::NAME,
935                key: key_to_insert.encode().into(),
936            }
937            .into())
938        );
939        assert_eq!(cursor.current(), Ok(Some((key_to_insert, B256::ZERO))));
940
941        tx.commit().expect(ERROR_COMMIT);
942
943        // Confirm the result
944        let tx = db.tx().expect(ERROR_INIT_TX);
945        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
946        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
947        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
948        tx.commit().expect(ERROR_COMMIT);
949    }
950
951    #[test]
952    fn db_cursor_insert_dup() {
953        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
954        let tx = db.tx_mut().expect(ERROR_INIT_TX);
955
956        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
957        let key = Address::random();
958        let subkey1 = B256::random();
959        let subkey2 = B256::random();
960
961        let entry1 = StorageEntry { key: subkey1, value: U256::ZERO };
962        assert!(dup_cursor.insert(key, &entry1).is_ok());
963
964        // Can't insert
965        let entry2 = StorageEntry { key: subkey2, value: U256::ZERO };
966        assert!(dup_cursor.insert(key, &entry2).is_err());
967    }
968
969    #[test]
970    fn db_cursor_delete_current_non_existent() {
971        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
972        let tx = db.tx_mut().expect(ERROR_INIT_TX);
973
974        let key1 = Address::with_last_byte(1);
975        let key2 = Address::with_last_byte(2);
976        let key3 = Address::with_last_byte(3);
977        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
978
979        assert!(cursor.insert(key1, &Account::default()).is_ok());
980        assert!(cursor.insert(key2, &Account::default()).is_ok());
981        assert!(cursor.insert(key3, &Account::default()).is_ok());
982
983        // Seek & delete key2
984        cursor.seek_exact(key2).unwrap();
985        assert_eq!(cursor.delete_current(), Ok(()));
986        assert_eq!(cursor.seek_exact(key2), Ok(None));
987
988        // Seek & delete key2 again
989        assert_eq!(cursor.seek_exact(key2), Ok(None));
990        assert_eq!(
991            cursor.delete_current(),
992            Err(DatabaseError::Delete(reth_libmdbx::Error::NoData.into()))
993        );
994        // Assert that key1 is still there
995        assert_eq!(cursor.seek_exact(key1), Ok(Some((key1, Account::default()))));
996        // Assert that key3 is still there
997        assert_eq!(cursor.seek_exact(key3), Ok(Some((key3, Account::default()))));
998    }
999
1000    #[test]
1001    fn db_cursor_insert_wherever_cursor_is() {
1002        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1003        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1004
1005        // PUT
1006        vec![0, 1, 3, 5, 7, 9]
1007            .into_iter()
1008            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1009            .expect(ERROR_PUT);
1010        tx.commit().expect(ERROR_COMMIT);
1011
1012        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1013        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1014
1015        // INSERT (cursor starts at last)
1016        cursor.last().unwrap();
1017        assert_eq!(cursor.current(), Ok(Some((9, B256::ZERO))));
1018
1019        for pos in (2..=8).step_by(2) {
1020            assert_eq!(cursor.insert(pos, &B256::ZERO), Ok(()));
1021            assert_eq!(cursor.current(), Ok(Some((pos, B256::ZERO))));
1022        }
1023        tx.commit().expect(ERROR_COMMIT);
1024
1025        // Confirm the result
1026        let tx = db.tx().expect(ERROR_INIT_TX);
1027        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1028        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1029        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1030        tx.commit().expect(ERROR_COMMIT);
1031    }
1032
1033    #[test]
1034    fn db_cursor_append() {
1035        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1036
1037        // PUT
1038        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1039        vec![0, 1, 2, 3, 4]
1040            .into_iter()
1041            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1042            .expect(ERROR_PUT);
1043        tx.commit().expect(ERROR_COMMIT);
1044
1045        // APPEND
1046        let key_to_append = 5;
1047        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1048        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1049        assert_eq!(cursor.append(key_to_append, &B256::ZERO), Ok(()));
1050        tx.commit().expect(ERROR_COMMIT);
1051
1052        // Confirm the result
1053        let tx = db.tx().expect(ERROR_INIT_TX);
1054        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1055        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1056        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
1057        tx.commit().expect(ERROR_COMMIT);
1058    }
1059
1060    #[test]
1061    fn db_cursor_append_failure() {
1062        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1063
1064        // PUT
1065        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1066        vec![0, 1, 3, 4, 5]
1067            .into_iter()
1068            .try_for_each(|key| tx.put::<CanonicalHeaders>(key, B256::ZERO))
1069            .expect(ERROR_PUT);
1070        tx.commit().expect(ERROR_COMMIT);
1071
1072        // APPEND
1073        let key_to_append = 2;
1074        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1075        let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
1076        assert_eq!(
1077            cursor.append(key_to_append, &B256::ZERO),
1078            Err(DatabaseWriteError {
1079                info: Error::KeyMismatch.into(),
1080                operation: DatabaseWriteOperation::CursorAppend,
1081                table_name: CanonicalHeaders::NAME,
1082                key: key_to_append.encode().into(),
1083            }
1084            .into())
1085        );
1086        assert_eq!(cursor.current(), Ok(Some((5, B256::ZERO)))); // the end of table
1087        tx.commit().expect(ERROR_COMMIT);
1088
1089        // Confirm the result
1090        let tx = db.tx().expect(ERROR_INIT_TX);
1091        let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
1092        let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
1093        assert_eq!(res, vec![0, 1, 3, 4, 5]);
1094        tx.commit().expect(ERROR_COMMIT);
1095    }
1096
1097    #[test]
1098    fn db_cursor_upsert() {
1099        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1100        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1101
1102        let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
1103        let key = Address::random();
1104
1105        let account = Account::default();
1106        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1107        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1108
1109        let account = Account { nonce: 1, ..Default::default() };
1110        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1111        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1112
1113        let account = Account { nonce: 2, ..Default::default() };
1114        cursor.upsert(key, &account).expect(ERROR_UPSERT);
1115        assert_eq!(cursor.seek_exact(key), Ok(Some((key, account))));
1116
1117        let mut dup_cursor = tx.cursor_dup_write::<PlainStorageState>().unwrap();
1118        let subkey = B256::random();
1119
1120        let value = U256::from(1);
1121        let entry1 = StorageEntry { key: subkey, value };
1122        dup_cursor.upsert(key, &entry1).expect(ERROR_UPSERT);
1123        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1124
1125        let value = U256::from(2);
1126        let entry2 = StorageEntry { key: subkey, value };
1127        dup_cursor.upsert(key, &entry2).expect(ERROR_UPSERT);
1128        assert_eq!(dup_cursor.seek_by_key_subkey(key, subkey), Ok(Some(entry1)));
1129        assert_eq!(dup_cursor.next_dup_val(), Ok(Some(entry2)));
1130    }
1131
1132    #[test]
1133    fn db_cursor_dupsort_append() {
1134        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1135
1136        let transition_id = 2;
1137
1138        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1139        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1140        vec![0, 1, 3, 4, 5]
1141            .into_iter()
1142            .try_for_each(|val| {
1143                cursor.append(
1144                    transition_id,
1145                    &AccountBeforeTx { address: Address::with_last_byte(val), info: None },
1146                )
1147            })
1148            .expect(ERROR_APPEND);
1149        tx.commit().expect(ERROR_COMMIT);
1150
1151        // APPEND DUP & APPEND
1152        let subkey_to_append = 2;
1153        let tx = db.tx_mut().expect(ERROR_INIT_TX);
1154        let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
1155        assert_eq!(
1156            cursor.append_dup(
1157                transition_id,
1158                AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1159            ),
1160            Err(DatabaseWriteError {
1161                info: Error::KeyMismatch.into(),
1162                operation: DatabaseWriteOperation::CursorAppendDup,
1163                table_name: AccountChangeSets::NAME,
1164                key: transition_id.encode().into(),
1165            }
1166            .into())
1167        );
1168        assert_eq!(
1169            cursor.append(
1170                transition_id - 1,
1171                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1172            ),
1173            Err(DatabaseWriteError {
1174                info: Error::KeyMismatch.into(),
1175                operation: DatabaseWriteOperation::CursorAppend,
1176                table_name: AccountChangeSets::NAME,
1177                key: (transition_id - 1).encode().into(),
1178            }
1179            .into())
1180        );
1181        assert_eq!(
1182            cursor.append(
1183                transition_id,
1184                &AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
1185            ),
1186            Ok(())
1187        );
1188    }
1189
1190    #[test]
1191    fn db_closure_put_get() {
1192        let path = TempDir::new().expect(ERROR_TEMPDIR).keep();
1193
1194        let value = Account {
1195            nonce: 18446744073709551615,
1196            bytecode_hash: Some(B256::random()),
1197            balance: U256::MAX,
1198        };
1199        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1200            .expect(ERROR_ETH_ADDRESS);
1201
1202        {
1203            let env = create_test_db_with_path(DatabaseEnvKind::RW, &path);
1204
1205            // PUT
1206            let result = env.update(|tx| {
1207                tx.put::<PlainAccountState>(key, value).expect(ERROR_PUT);
1208                200
1209            });
1210            assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
1211        }
1212
1213        let env = DatabaseEnv::open(
1214            &path,
1215            DatabaseEnvKind::RO,
1216            DatabaseArguments::new(ClientVersion::default()),
1217        )
1218        .expect(ERROR_DB_CREATION);
1219
1220        // GET
1221        let result =
1222            env.view(|tx| tx.get::<PlainAccountState>(key).expect(ERROR_GET)).expect(ERROR_GET);
1223
1224        assert_eq!(result, Some(value))
1225    }
1226
1227    #[test]
1228    fn db_dup_sort() {
1229        let env = create_test_db(DatabaseEnvKind::RW);
1230        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1231            .expect(ERROR_ETH_ADDRESS);
1232
1233        // PUT (0,0)
1234        let value00 = StorageEntry::default();
1235        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1236
1237        // PUT (2,2)
1238        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1239        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1240
1241        // PUT (1,1)
1242        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1243        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1244
1245        // Iterate with cursor
1246        {
1247            let tx = env.tx().expect(ERROR_INIT_TX);
1248            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1249
1250            // Notice that value11 and value22 have been ordered in the DB.
1251            assert_eq!(Some(value00), cursor.next_dup_val().unwrap());
1252            assert_eq!(Some(value11), cursor.next_dup_val().unwrap());
1253            assert_eq!(Some(value22), cursor.next_dup_val().unwrap());
1254        }
1255
1256        // Seek value with exact subkey
1257        {
1258            let tx = env.tx().expect(ERROR_INIT_TX);
1259            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1260            let mut walker = cursor.walk_dup(Some(key), Some(B256::with_last_byte(1))).unwrap();
1261            assert_eq!(
1262                (key, value11),
1263                walker
1264                    .next()
1265                    .expect("element should exist.")
1266                    .expect("should be able to retrieve it.")
1267            );
1268        }
1269    }
1270
1271    #[test]
1272    fn db_walk_dup_with_not_existing_key() {
1273        let env = create_test_db(DatabaseEnvKind::RW);
1274        let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
1275            .expect(ERROR_ETH_ADDRESS);
1276
1277        // PUT (0,0)
1278        let value00 = StorageEntry::default();
1279        env.update(|tx| tx.put::<PlainStorageState>(key, value00).expect(ERROR_PUT)).unwrap();
1280
1281        // PUT (2,2)
1282        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1283        env.update(|tx| tx.put::<PlainStorageState>(key, value22).expect(ERROR_PUT)).unwrap();
1284
1285        // PUT (1,1)
1286        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1287        env.update(|tx| tx.put::<PlainStorageState>(key, value11).expect(ERROR_PUT)).unwrap();
1288
1289        // Try to walk_dup with not existing key should immediately return None
1290        {
1291            let tx = env.tx().expect(ERROR_INIT_TX);
1292            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1293            let not_existing_key = Address::ZERO;
1294            let mut walker = cursor.walk_dup(Some(not_existing_key), None).unwrap();
1295            assert_eq!(walker.next(), None);
1296        }
1297    }
1298
1299    #[test]
1300    fn db_iterate_over_all_dup_values() {
1301        let env = create_test_db(DatabaseEnvKind::RW);
1302        let key1 = Address::from_str("0x1111111111111111111111111111111111111111")
1303            .expect(ERROR_ETH_ADDRESS);
1304        let key2 = Address::from_str("0x2222222222222222222222222222222222222222")
1305            .expect(ERROR_ETH_ADDRESS);
1306
1307        // PUT key1 (0,0)
1308        let value00 = StorageEntry::default();
1309        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1310
1311        // PUT key1 (1,1)
1312        let value11 = StorageEntry { key: B256::with_last_byte(1), value: U256::from(1) };
1313        env.update(|tx| tx.put::<PlainStorageState>(key1, value11).expect(ERROR_PUT)).unwrap();
1314
1315        // PUT key2 (2,2)
1316        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1317        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1318
1319        // Iterate with walk_dup
1320        {
1321            let tx = env.tx().expect(ERROR_INIT_TX);
1322            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1323            let mut walker = cursor.walk_dup(None, None).unwrap();
1324
1325            // Notice that value11 and value22 have been ordered in the DB.
1326            assert_eq!(Some(Ok((key1, value00))), walker.next());
1327            assert_eq!(Some(Ok((key1, value11))), walker.next());
1328            // NOTE: Dup cursor does NOT iterates on all values but only on duplicated values of the
1329            // same key. assert_eq!(Ok(Some(value22.clone())), walker.next());
1330            assert_eq!(None, walker.next());
1331        }
1332
1333        // Iterate by using `walk`
1334        {
1335            let tx = env.tx().expect(ERROR_INIT_TX);
1336            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1337            let first = cursor.first().unwrap().unwrap();
1338            let mut walker = cursor.walk(Some(first.0)).unwrap();
1339            assert_eq!(Some(Ok((key1, value00))), walker.next());
1340            assert_eq!(Some(Ok((key1, value11))), walker.next());
1341            assert_eq!(Some(Ok((key2, value22))), walker.next());
1342        }
1343    }
1344
1345    #[test]
1346    fn dup_value_with_same_subkey() {
1347        let env = create_test_db(DatabaseEnvKind::RW);
1348        let key1 = Address::new([0x11; 20]);
1349        let key2 = Address::new([0x22; 20]);
1350
1351        // PUT key1 (0,1)
1352        let value01 = StorageEntry { key: B256::with_last_byte(0), value: U256::from(1) };
1353        env.update(|tx| tx.put::<PlainStorageState>(key1, value01).expect(ERROR_PUT)).unwrap();
1354
1355        // PUT key1 (0,0)
1356        let value00 = StorageEntry::default();
1357        env.update(|tx| tx.put::<PlainStorageState>(key1, value00).expect(ERROR_PUT)).unwrap();
1358
1359        // PUT key2 (2,2)
1360        let value22 = StorageEntry { key: B256::with_last_byte(2), value: U256::from(2) };
1361        env.update(|tx| tx.put::<PlainStorageState>(key2, value22).expect(ERROR_PUT)).unwrap();
1362
1363        // Iterate with walk
1364        {
1365            let tx = env.tx().expect(ERROR_INIT_TX);
1366            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1367            let first = cursor.first().unwrap().unwrap();
1368            let mut walker = cursor.walk(Some(first.0)).unwrap();
1369
1370            // NOTE: Both values are present
1371            assert_eq!(Some(Ok((key1, value00))), walker.next());
1372            assert_eq!(Some(Ok((key1, value01))), walker.next());
1373            assert_eq!(Some(Ok((key2, value22))), walker.next());
1374        }
1375
1376        // seek_by_key_subkey
1377        {
1378            let tx = env.tx().expect(ERROR_INIT_TX);
1379            let mut cursor = tx.cursor_dup_read::<PlainStorageState>().unwrap();
1380
1381            // NOTE: There are two values with same SubKey but only first one is shown
1382            assert_eq!(Ok(Some(value00)), cursor.seek_by_key_subkey(key1, value00.key));
1383            // key1 but value is greater than the one in the DB
1384            assert_eq!(Ok(None), cursor.seek_by_key_subkey(key1, value22.key));
1385        }
1386    }
1387
1388    #[test]
1389    fn db_sharded_key() {
1390        let db: Arc<DatabaseEnv> = create_test_db(DatabaseEnvKind::RW);
1391        let real_key = address!("0xa2c122be93b0074270ebee7f6b7292c7deb45047");
1392
1393        let shards = 5;
1394        for i in 1..=shards {
1395            let key = ShardedKey::new(real_key, if i == shards { u64::MAX } else { i * 100 });
1396            let list = IntegerList::new_pre_sorted([i * 100u64]);
1397
1398            db.update(|tx| tx.put::<AccountsHistory>(key.clone(), list.clone()).expect(""))
1399                .unwrap();
1400        }
1401
1402        // Seek value with non existing key.
1403        {
1404            let tx = db.tx().expect(ERROR_INIT_TX);
1405            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1406
1407            // It will seek the one greater or equal to the query. Since we have `Address | 100`,
1408            // `Address | 200` in the database and we're querying `Address | 150` it will return us
1409            // `Address | 200`.
1410            let mut walker = cursor.walk(Some(ShardedKey::new(real_key, 150))).unwrap();
1411            let (key, list) = walker
1412                .next()
1413                .expect("element should exist.")
1414                .expect("should be able to retrieve it.");
1415
1416            assert_eq!(ShardedKey::new(real_key, 200), key);
1417            let list200 = IntegerList::new_pre_sorted([200u64]);
1418            assert_eq!(list200, list);
1419        }
1420        // Seek greatest index
1421        {
1422            let tx = db.tx().expect(ERROR_INIT_TX);
1423            let mut cursor = tx.cursor_read::<AccountsHistory>().unwrap();
1424
1425            // It will seek the MAX value of transition index and try to use prev to get first
1426            // biggers.
1427            let _unknown = cursor.seek_exact(ShardedKey::new(real_key, u64::MAX)).unwrap();
1428            let (key, list) = cursor
1429                .prev()
1430                .expect("element should exist.")
1431                .expect("should be able to retrieve it.");
1432
1433            assert_eq!(ShardedKey::new(real_key, 400), key);
1434            let list400 = IntegerList::new_pre_sorted([400u64]);
1435            assert_eq!(list400, list);
1436        }
1437    }
1438}