Skip to main content

reth_db/
lib.rs

1//! MDBX implementation for reth's database abstraction layer.
2//!
3//! This crate is an implementation of `reth-db-api` for MDBX, as well as a few other common
4//! database types.
5//!
6//! # Overview
7//!
8//! An overview of the current data model of reth can be found in the [`mod@tables`] module.
9
10#![doc(
11    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
12    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
13    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
14)]
15#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17
18mod implementation;
19pub mod lockfile;
20#[cfg(feature = "mdbx")]
21mod metrics;
22pub mod static_file;
23#[cfg(feature = "mdbx")]
24mod utils;
25pub mod version;
26
27#[cfg(feature = "mdbx")]
28pub mod mdbx;
29
30pub use reth_storage_errors::db::{DatabaseError, DatabaseWriteOperation};
31#[cfg(feature = "mdbx")]
32pub use utils::is_database_empty;
33
34#[cfg(feature = "mdbx")]
35pub use mdbx::{create_db, init_db, open_db, open_db_read_only, DatabaseEnv, DatabaseEnvKind};
36
37pub use models::ClientVersion;
38pub use reth_db_api::*;
39
40/// Collection of database test utilities
41#[cfg(any(test, feature = "test-utils"))]
42pub mod test_utils {
43    use super::*;
44    use crate::mdbx::DatabaseArguments;
45    use parking_lot::RwLock;
46    use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
47    use reth_fs_util;
48    use std::{
49        fmt::Formatter,
50        path::{Path, PathBuf},
51        sync::Arc,
52    };
53    use tempfile::TempDir;
54
55    /// Error during database open
56    pub const ERROR_DB_OPEN: &str = "could not open the database file";
57    /// Error during database creation
58    pub const ERROR_DB_CREATION: &str = "could not create the database file";
59    /// Error during database creation
60    pub const ERROR_STATIC_FILES_CREATION: &str = "could not create the static file path";
61    /// Error during table creation
62    pub const ERROR_TABLE_CREATION: &str = "could not create tables in the database";
63    /// Error during tempdir creation
64    pub const ERROR_TEMPDIR: &str = "could not create a temporary directory";
65
66    /// A database will delete the db dir when dropped.
67    pub struct TempDatabase<DB> {
68        db: Option<DB>,
69        path: PathBuf,
70        /// Executed right before a database transaction is created.
71        pre_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
72        /// Executed right after a database transaction is created.
73        post_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
74    }
75
76    impl<DB: std::fmt::Debug> std::fmt::Debug for TempDatabase<DB> {
77        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78            f.debug_struct("TempDatabase").field("db", &self.db).field("path", &self.path).finish()
79        }
80    }
81
82    impl<DB> Drop for TempDatabase<DB> {
83        fn drop(&mut self) {
84            if let Some(db) = self.db.take() {
85                drop(db);
86                let _ = reth_fs_util::remove_dir_all(&self.path);
87            }
88        }
89    }
90
91    impl<DB> TempDatabase<DB> {
92        /// Create new [`TempDatabase`] instance.
93        pub fn new(db: DB, path: PathBuf) -> Self {
94            Self {
95                db: Some(db),
96                path,
97                pre_tx_hook: RwLock::new(Box::new(|| ())),
98                post_tx_hook: RwLock::new(Box::new(|| ())),
99            }
100        }
101
102        /// Returns the reference to inner db.
103        pub const fn db(&self) -> &DB {
104            self.db.as_ref().unwrap()
105        }
106
107        /// Returns the path to the database.
108        pub fn path(&self) -> &Path {
109            &self.path
110        }
111
112        /// Convert temp database into inner.
113        pub fn into_inner_db(mut self) -> DB {
114            self.db.take().unwrap() // take out db to avoid clean path in drop fn
115        }
116
117        /// Sets [`TempDatabase`] new pre transaction creation hook.
118        pub fn set_pre_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
119            let mut db_hook = self.pre_tx_hook.write();
120            *db_hook = hook;
121        }
122
123        /// Sets [`TempDatabase`] new post transaction creation hook.
124        pub fn set_post_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
125            let mut db_hook = self.post_tx_hook.write();
126            *db_hook = hook;
127        }
128    }
129
130    impl<DB: Database> Database for TempDatabase<DB> {
131        type TX = <DB as Database>::TX;
132        type TXMut = <DB as Database>::TXMut;
133        fn tx(&self) -> Result<Self::TX, DatabaseError> {
134            self.pre_tx_hook.read()();
135            let tx = self.db().tx()?;
136            self.post_tx_hook.read()();
137            Ok(tx)
138        }
139
140        fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
141            self.db().tx_mut()
142        }
143
144        fn path(&self) -> std::path::PathBuf {
145            self.db().path()
146        }
147
148        fn oldest_reader_txnid(&self) -> Option<u64> {
149            self.db().oldest_reader_txnid()
150        }
151
152        fn last_txnid(&self) -> Option<u64> {
153            self.db().last_txnid()
154        }
155    }
156
157    impl<DB: DatabaseMetrics> DatabaseMetrics for TempDatabase<DB> {
158        fn report_metrics(&self) {
159            self.db().report_metrics()
160        }
161    }
162
163    /// Create `static_files` path for testing
164    #[track_caller]
165    pub fn create_test_static_files_dir() -> (TempDir, PathBuf) {
166        let temp_dir = TempDir::with_prefix("reth-test-static-").expect(ERROR_TEMPDIR);
167        let path = temp_dir.path().to_path_buf();
168        (temp_dir, path)
169    }
170
171    /// Create `rocksdb` path for testing
172    #[track_caller]
173    pub fn create_test_rocksdb_dir() -> (TempDir, PathBuf) {
174        let temp_dir = TempDir::with_prefix("reth-test-rocksdb-").expect(ERROR_TEMPDIR);
175        let path = temp_dir.path().to_path_buf();
176        (temp_dir, path)
177    }
178
179    /// Get a temporary directory path to use for the database
180    pub fn tempdir_path() -> PathBuf {
181        let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir();
182        builder.expect(ERROR_TEMPDIR).keep()
183    }
184
185    /// Create read/write database for testing
186    #[track_caller]
187    pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
188        let path = tempdir_path();
189        let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
190
191        let db = init_db(&path, DatabaseArguments::test()).expect(&emsg);
192
193        Arc::new(TempDatabase::new(db, path))
194    }
195
196    /// Create read/write database for testing
197    #[track_caller]
198    pub fn create_test_rw_db_with_path<P: AsRef<Path>>(path: P) -> Arc<TempDatabase<DatabaseEnv>> {
199        let path = path.as_ref().to_path_buf();
200        let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
201        let db = init_db(path.as_path(), DatabaseArguments::test()).expect(&emsg);
202        Arc::new(TempDatabase::new(db, path))
203    }
204
205    /// Create read/write database for testing within a data directory.
206    ///
207    /// The database is created at `datadir/db`, and `TempDatabase` will clean up the entire
208    /// `datadir` on drop.
209    #[track_caller]
210    pub fn create_test_rw_db_with_datadir<P: AsRef<Path>>(
211        datadir: P,
212    ) -> Arc<TempDatabase<DatabaseEnv>> {
213        let datadir = datadir.as_ref().to_path_buf();
214        let db_path = datadir.join("db");
215        let emsg = format!("{ERROR_DB_CREATION}: {db_path:?}");
216        let db = init_db(&db_path, DatabaseArguments::test()).expect(&emsg);
217        Arc::new(TempDatabase::new(db, datadir))
218    }
219
220    /// Create read only database for testing
221    #[track_caller]
222    pub fn create_test_ro_db() -> Arc<TempDatabase<DatabaseEnv>> {
223        let args = DatabaseArguments::test();
224
225        let path = tempdir_path();
226        let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
227        {
228            init_db(path.as_path(), args.clone()).expect(&emsg);
229        }
230        let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN);
231        Arc::new(TempDatabase::new(db, path))
232    }
233
234    /// Enables MDBX legacy multi-open mode, allowing the same database to be opened
235    /// multiple times within a single process. This is needed for tests that simulate
236    /// concurrent primary + read-only secondary provider scenarios.
237    ///
238    /// Must be called before any MDBX environment is opened.
239    ///
240    /// # Safety
241    ///
242    /// This uses `MDBX_DBG_LEGACY_MULTIOPEN` which recovers POSIX file locks on close.
243    /// It may cause unexpected pauses and does not perfectly mirror multi-process behavior.
244    /// Use only in tests.
245    pub fn enable_legacy_multiopen() {
246        unsafe {
247            reth_libmdbx::ffi::mdbx_setup_debug(
248                reth_libmdbx::ffi::MDBX_LOG_DONTCHANGE,
249                reth_libmdbx::ffi::MDBX_DBG_LEGACY_MULTIOPEN as reth_libmdbx::ffi::MDBX_debug_flags,
250                None,
251            );
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use crate::{
259        init_db,
260        mdbx::DatabaseArguments,
261        open_db, tables,
262        version::{db_version_file_path, DatabaseVersionError},
263    };
264    use assert_matches::assert_matches;
265    use reth_db_api::{
266        cursor::DbCursorRO, database::Database, models::ClientVersion, transaction::DbTx,
267    };
268    use reth_libmdbx::MaxReadTransactionDuration;
269    use std::time::Duration;
270    use tempfile::tempdir;
271
272    #[test]
273    fn test_temp_database_cleanup() {
274        // Test that TempDatabase properly cleans up its directory when dropped
275        let temp_path = {
276            let db = crate::test_utils::create_test_rw_db();
277            let path = db.path();
278            assert!(path.exists(), "Database directory should exist while TempDatabase is alive");
279            path
280            // TempDatabase dropped here
281        };
282
283        // Verify the directory was cleaned up
284        assert!(
285            !temp_path.exists(),
286            "Database directory should be cleaned up after TempDatabase is dropped"
287        );
288    }
289
290    #[test]
291    fn db_version() {
292        let path = tempdir().unwrap();
293
294        let args = DatabaseArguments::new(ClientVersion::default())
295            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
296
297        // Database is empty
298        {
299            let db = init_db(&path, args.clone());
300            assert_matches!(db, Ok(_));
301        }
302
303        // Database is not empty, current version is the same as in the file
304        {
305            let db = init_db(&path, args.clone());
306            assert_matches!(db, Ok(_));
307        }
308
309        // Database is not empty, version file is malformed
310        {
311            reth_fs_util::write(path.path().join(db_version_file_path(&path)), "invalid-version")
312                .unwrap();
313            let db = init_db(&path, args.clone());
314            assert!(db.is_err());
315            assert_matches!(
316                db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
317                Some(DatabaseVersionError::MalformedFile)
318            )
319        }
320
321        // Database is not empty, version file contains not matching version
322        {
323            reth_fs_util::write(path.path().join(db_version_file_path(&path)), "0").unwrap();
324            let db = init_db(&path, args);
325            assert!(db.is_err());
326            assert_matches!(
327                db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
328                Some(DatabaseVersionError::VersionMismatch { version: 0 })
329            )
330        }
331    }
332
333    #[test]
334    fn db_client_version() {
335        let path = tempdir().unwrap();
336
337        // Empty client version is not recorded
338        {
339            let db = init_db(&path, DatabaseArguments::new(ClientVersion::default())).unwrap();
340            let tx = db.tx().unwrap();
341            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
342            assert_matches!(cursor.first(), Ok(None));
343        }
344
345        // Client version is recorded
346        let first_version = ClientVersion { version: String::from("v1"), ..Default::default() };
347        {
348            let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
349            let tx = db.tx().unwrap();
350            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
351            assert_eq!(
352                cursor
353                    .walk_range(..)
354                    .unwrap()
355                    .map(|x| x.map(|(_, v)| v))
356                    .collect::<Result<Vec<_>, _>>()
357                    .unwrap(),
358                vec![first_version.clone()]
359            );
360        }
361
362        // Same client version is not duplicated.
363        {
364            let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
365            let tx = db.tx().unwrap();
366            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
367            assert_eq!(
368                cursor
369                    .walk_range(..)
370                    .unwrap()
371                    .map(|x| x.map(|(_, v)| v))
372                    .collect::<Result<Vec<_>, _>>()
373                    .unwrap(),
374                vec![first_version.clone()]
375            );
376        }
377
378        // Different client version is recorded
379        std::thread::sleep(Duration::from_secs(1));
380        let second_version = ClientVersion { version: String::from("v2"), ..Default::default() };
381        {
382            let db = init_db(&path, DatabaseArguments::new(second_version.clone())).unwrap();
383            let tx = db.tx().unwrap();
384            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
385            assert_eq!(
386                cursor
387                    .walk_range(..)
388                    .unwrap()
389                    .map(|x| x.map(|(_, v)| v))
390                    .collect::<Result<Vec<_>, _>>()
391                    .unwrap(),
392                vec![first_version.clone(), second_version.clone()]
393            );
394        }
395
396        // Different client version is recorded on db open.
397        std::thread::sleep(Duration::from_secs(1));
398        let third_version = ClientVersion { version: String::from("v3"), ..Default::default() };
399        {
400            let db = open_db(path.path(), DatabaseArguments::new(third_version.clone())).unwrap();
401            let tx = db.tx().unwrap();
402            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
403            assert_eq!(
404                cursor
405                    .walk_range(..)
406                    .unwrap()
407                    .map(|x| x.map(|(_, v)| v))
408                    .collect::<Result<Vec<_>, _>>()
409                    .unwrap(),
410                vec![first_version, second_version, third_version]
411            );
412        }
413    }
414}