Skip to main content

reth_libmdbx/
environment.rs

1use crate::{
2    database::Database,
3    error::{mdbx_result, Error, Result},
4    flags::EnvironmentFlags,
5    transaction::{RO, RW},
6    txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
7    txn_pool::ReadTxnPool,
8    Mode, SyncMode, Transaction, TransactionKind,
9};
10use byteorder::{ByteOrder, NativeEndian};
11use mem::size_of;
12use std::{
13    ffi::CString,
14    fmt::{self, Debug},
15    mem,
16    ops::{Bound, RangeBounds},
17    path::Path,
18    ptr,
19    sync::{mpsc::sync_channel, Arc},
20    thread::sleep,
21    time::Duration,
22};
23use tracing::warn;
24
25/// The default maximum duration of a read transaction.
26#[cfg(feature = "read-tx-timeouts")]
27const DEFAULT_MAX_READ_TRANSACTION_DURATION: Duration = Duration::from_secs(5 * 60);
28
29/// An environment supports multiple databases, all residing in the same shared-memory map.
30///
31/// Accessing the environment is thread-safe.
32/// The environment will be closed when the last instance of this type is dropped.
33#[derive(Clone)]
34pub struct Environment {
35    inner: Arc<EnvironmentInner>,
36}
37
38impl Environment {
39    /// Creates a new builder for specifying options for opening an MDBX environment.
40    pub fn builder() -> EnvironmentBuilder {
41        EnvironmentBuilder {
42            flags: EnvironmentFlags::default(),
43            max_readers: None,
44            max_dbs: None,
45            sync_bytes: None,
46            sync_period: None,
47            rp_augment_limit: None,
48            loose_limit: None,
49            dp_reserve_limit: None,
50            txn_dp_limit: None,
51            spill_max_denominator: None,
52            spill_min_denominator: None,
53            geometry: None,
54            log_level: None,
55            kind: Default::default(),
56            handle_slow_readers: None,
57            #[cfg(feature = "read-tx-timeouts")]
58            max_read_transaction_duration: None,
59        }
60    }
61
62    /// Returns true if the environment was opened as WRITEMAP.
63    #[inline]
64    pub fn is_write_map(&self) -> bool {
65        self.inner.env_kind.is_write_map()
66    }
67
68    /// Returns the kind of the environment.
69    #[inline]
70    pub fn env_kind(&self) -> EnvironmentKind {
71        self.inner.env_kind
72    }
73
74    /// Returns true if the environment was opened in [`crate::Mode::ReadWrite`] mode.
75    #[inline]
76    pub fn is_read_write(&self) -> Result<bool> {
77        Ok(!self.is_read_only()?)
78    }
79
80    /// Returns true if the environment was opened in [`crate::Mode::ReadOnly`] mode.
81    #[inline]
82    pub fn is_read_only(&self) -> Result<bool> {
83        Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
84    }
85
86    /// Returns the transaction manager.
87    #[inline]
88    pub(crate) fn txn_manager(&self) -> &TxnManager {
89        &self.inner.txn_manager
90    }
91
92    /// Returns the number of timed out transactions that were not aborted by the user yet.
93    #[cfg(feature = "read-tx-timeouts")]
94    pub fn timed_out_not_aborted_transactions(&self) -> usize {
95        self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
96    }
97
98    /// Create a read-only transaction for use with the environment.
99    ///
100    /// Reuses a previously-reset transaction handle from the internal pool when available,
101    /// avoiding the `lck_rdt_lock` mutex in MDBX's `mvcc_bind_slot` on each new transaction.
102    #[inline]
103    pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
104        if let Some(txn_ptr) = self.inner.ro_txn_pool.pop() {
105            return Ok(Transaction::new_from_ptr(self.clone(), txn_ptr));
106        }
107        Transaction::new(self.clone())
108    }
109
110    /// Returns the read transaction pool.
111    #[inline]
112    pub(crate) fn ro_txn_pool(&self) -> &ReadTxnPool {
113        &self.inner.ro_txn_pool
114    }
115
116    /// Create a read-write transaction for use with the environment. This method will block while
117    /// there are any other read-write transactions open on the environment.
118    pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
119        let mut warned = false;
120        let txn = loop {
121            let (tx, rx) = sync_channel(0);
122            self.txn_manager().send_message(TxnManagerMessage::Begin {
123                parent: TxnPtr(ptr::null_mut()),
124                flags: RW::OPEN_FLAGS,
125                sender: tx,
126            });
127            let res = rx.recv().unwrap();
128            if matches!(&res, Err(Error::Busy)) {
129                if !warned {
130                    warned = true;
131                    warn!(target: "libmdbx", "Process stalled, awaiting read-write transaction lock.");
132                }
133                sleep(Duration::from_millis(250));
134                continue
135            }
136
137            break res
138        }?;
139        Ok(Transaction::new_from_ptr(self.clone(), txn.0))
140    }
141
142    /// Returns a raw pointer to the underlying MDBX environment.
143    ///
144    /// The caller **must** ensure that the pointer is never dereferenced after the environment has
145    /// been dropped.
146    #[inline]
147    pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
148        self.inner.env
149    }
150
151    /// Executes the given closure once
152    ///
153    /// This is only intended to be used when accessing mdbx ffi functions directly is required.
154    ///
155    /// The caller **must** ensure that the pointer is only used within the closure.
156    #[inline]
157    #[doc(hidden)]
158    pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
159    where
160        F: FnOnce(*mut ffi::MDBX_env) -> T,
161    {
162        f(self.env_ptr())
163    }
164
165    /// Flush the environment data buffers to disk.
166    pub fn sync(&self, force: bool) -> Result<bool> {
167        mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
168    }
169
170    /// Retrieves statistics about this environment.
171    pub fn stat(&self) -> Result<Stat> {
172        unsafe {
173            let mut stat = Stat::new();
174            mdbx_result(ffi::mdbx_env_stat_ex(
175                self.env_ptr(),
176                ptr::null(),
177                stat.mdbx_stat(),
178                size_of::<Stat>(),
179            ))?;
180            Ok(stat)
181        }
182    }
183
184    /// Retrieves info about this environment.
185    pub fn info(&self) -> Result<Info> {
186        unsafe {
187            let mut info = Info(mem::zeroed());
188            mdbx_result(ffi::mdbx_env_info_ex(
189                self.env_ptr(),
190                ptr::null(),
191                &mut info.0,
192                size_of::<Info>(),
193            ))?;
194            Ok(info)
195        }
196    }
197
198    /// Retrieves the total number of pages on the freelist.
199    ///
200    /// Along with [`Environment::info()`], this can be used to calculate the exact number
201    /// of used pages as well as free pages in this environment.
202    ///
203    /// ```
204    /// # use reth_libmdbx::Environment;
205    /// let dir = tempfile::tempdir().unwrap();
206    /// let env = Environment::builder().open(dir.path()).unwrap();
207    /// let info = env.info().unwrap();
208    /// let stat = env.stat().unwrap();
209    /// let freelist = env.freelist().unwrap();
210    /// let last_pgno = info.last_pgno() + 1; // pgno is 0 based.
211    /// let total_pgs = info.map_size() / stat.page_size() as usize;
212    /// let pgs_in_use = last_pgno - freelist;
213    /// let pgs_free = total_pgs - pgs_in_use;
214    /// ```
215    ///
216    /// Note:
217    ///
218    /// * MDBX stores all the freelists in the designated database 0 in each environment, and the
219    ///   freelist count is stored at the beginning of the value as `uint32_t` in the native byte
220    ///   order.
221    ///
222    /// * It will create a read transaction to traverse the freelist database.
223    pub fn freelist(&self) -> Result<usize> {
224        let mut freelist: usize = 0;
225        let txn = self.begin_ro_txn()?;
226        let db = Database::freelist_db();
227        let cursor = txn.cursor(db.dbi())?;
228
229        for result in cursor.iter_slices() {
230            let (_key, value) = result?;
231            if value.len() < size_of::<u32>() {
232                return Err(Error::Corrupted)
233            }
234            let s = &value[..size_of::<u32>()];
235            freelist += NativeEndian::read_u32(s) as usize;
236        }
237
238        Ok(freelist)
239    }
240}
241
242/// Container type for Environment internals.
243///
244/// This holds the raw pointer to the MDBX environment and the transaction manager.
245/// The env is opened via [`mdbx_env_create`](ffi::mdbx_env_create) and closed when this type drops.
246struct EnvironmentInner {
247    /// The raw pointer to the MDBX environment.
248    ///
249    /// Accessing the environment is thread-safe as long as long as this type exists.
250    env: *mut ffi::MDBX_env,
251    /// Whether the environment was opened as WRITEMAP.
252    env_kind: EnvironmentKind,
253    /// Transaction manager
254    txn_manager: TxnManager,
255    /// Pool of reset read-only transaction handles for reuse.
256    ro_txn_pool: ReadTxnPool,
257}
258
259impl Drop for EnvironmentInner {
260    fn drop(&mut self) {
261        // Abort all pooled read transactions before closing the environment.
262        self.ro_txn_pool.drain();
263
264        // Close open mdbx environment on drop
265        unsafe {
266            ffi::mdbx_env_close_ex(self.env, false);
267        }
268    }
269}
270
271// SAFETY: internal type, only used inside [Environment]. Accessing the environment pointer is
272// thread-safe
273unsafe impl Send for EnvironmentInner {}
274unsafe impl Sync for EnvironmentInner {}
275
276/// Determines how data is mapped into memory
277///
278/// It only takes effect when the environment is opened.
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
280pub enum EnvironmentKind {
281    /// Open the environment in default mode, without WRITEMAP.
282    #[default]
283    Default,
284    /// Open the environment as mdbx-WRITEMAP.
285    /// Use a writable memory map unless the environment is opened as `MDBX_RDONLY`
286    /// ([`crate::Mode::ReadOnly`]).
287    ///
288    /// All data will be mapped into memory in the read-write mode [`crate::Mode::ReadWrite`]. This
289    /// offers a significant performance benefit, since the data will be modified directly in
290    /// mapped memory and then flushed to disk by single system call, without any memory
291    /// management nor copying.
292    ///
293    /// This mode is incompatible with nested transactions.
294    WriteMap,
295}
296
297impl EnvironmentKind {
298    /// Returns true if the environment was opened as WRITEMAP.
299    #[inline]
300    pub const fn is_write_map(&self) -> bool {
301        matches!(self, Self::WriteMap)
302    }
303
304    /// Additional flags required when opening the environment.
305    pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
306        match self {
307            Self::Default => ffi::MDBX_ENV_DEFAULTS,
308            Self::WriteMap => ffi::MDBX_WRITEMAP,
309        }
310    }
311}
312
313#[derive(Copy, Clone, Debug)]
314pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
315unsafe impl Send for EnvPtr {}
316unsafe impl Sync for EnvPtr {}
317
318/// Environment statistics.
319///
320/// Contains information about the size and layout of an MDBX environment or database.
321#[derive(Debug)]
322#[repr(transparent)]
323pub struct Stat(ffi::MDBX_stat);
324
325impl Stat {
326    /// Create a new Stat with zero'd inner struct `ffi::MDBX_stat`.
327    pub(crate) const fn new() -> Self {
328        unsafe { Self(mem::zeroed()) }
329    }
330
331    /// Returns a mut pointer to `ffi::MDBX_stat`.
332    pub(crate) const fn mdbx_stat(&mut self) -> *mut ffi::MDBX_stat {
333        &mut self.0
334    }
335}
336
337impl Stat {
338    /// Size of a database page. This is the same for all databases in the environment.
339    #[inline]
340    pub const fn page_size(&self) -> u32 {
341        self.0.ms_psize
342    }
343
344    /// Depth (height) of the B-tree.
345    #[inline]
346    pub const fn depth(&self) -> u32 {
347        self.0.ms_depth
348    }
349
350    /// Number of internal (non-leaf) pages.
351    #[inline]
352    pub const fn branch_pages(&self) -> usize {
353        self.0.ms_branch_pages as usize
354    }
355
356    /// Number of leaf pages.
357    #[inline]
358    pub const fn leaf_pages(&self) -> usize {
359        self.0.ms_leaf_pages as usize
360    }
361
362    /// Number of overflow pages.
363    #[inline]
364    pub const fn overflow_pages(&self) -> usize {
365        self.0.ms_overflow_pages as usize
366    }
367
368    /// Number of data items.
369    #[inline]
370    pub const fn entries(&self) -> usize {
371        self.0.ms_entries as usize
372    }
373}
374
375#[derive(Debug)]
376#[repr(transparent)]
377pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
378
379impl GeometryInfo {
380    pub const fn min(&self) -> u64 {
381        self.0.lower
382    }
383}
384
385/// Environment information.
386///
387/// Contains environment information about the map size, readers, last txn id etc.
388#[derive(Debug)]
389#[repr(transparent)]
390pub struct Info(ffi::MDBX_envinfo);
391
392impl Info {
393    pub const fn geometry(&self) -> GeometryInfo {
394        GeometryInfo(self.0.mi_geo)
395    }
396
397    /// Size of memory map.
398    #[inline]
399    pub const fn map_size(&self) -> usize {
400        self.0.mi_mapsize as usize
401    }
402
403    /// Last used page number
404    #[inline]
405    pub const fn last_pgno(&self) -> usize {
406        self.0.mi_last_pgno as usize
407    }
408
409    /// Last transaction ID
410    #[inline]
411    pub const fn last_txnid(&self) -> usize {
412        self.0.mi_recent_txnid as usize
413    }
414
415    /// Max reader slots in the environment
416    #[inline]
417    pub const fn max_readers(&self) -> usize {
418        self.0.mi_maxreaders as usize
419    }
420
421    /// Max reader slots used in the environment
422    #[inline]
423    pub const fn num_readers(&self) -> usize {
424        self.0.mi_numreaders as usize
425    }
426
427    /// Transaction ID of the oldest active reader.
428    #[inline]
429    pub const fn latter_reader_txnid(&self) -> u64 {
430        self.0.mi_latter_reader_txnid
431    }
432
433    /// Return the internal page ops metrics
434    #[inline]
435    pub const fn page_ops(&self) -> PageOps {
436        PageOps {
437            newly: self.0.mi_pgop_stat.newly,
438            cow: self.0.mi_pgop_stat.cow,
439            clone: self.0.mi_pgop_stat.clone,
440            split: self.0.mi_pgop_stat.split,
441            merge: self.0.mi_pgop_stat.merge,
442            spill: self.0.mi_pgop_stat.spill,
443            unspill: self.0.mi_pgop_stat.unspill,
444            wops: self.0.mi_pgop_stat.wops,
445            prefault: self.0.mi_pgop_stat.prefault,
446            mincore: self.0.mi_pgop_stat.mincore,
447            msync: self.0.mi_pgop_stat.msync,
448            fsync: self.0.mi_pgop_stat.fsync,
449        }
450    }
451
452    /// Return the mode of the database
453    #[inline]
454    pub const fn mode(&self) -> Mode {
455        let mode = self.0.mi_mode as ffi::MDBX_env_flags_t;
456        if (mode & ffi::MDBX_RDONLY) != 0 {
457            Mode::ReadOnly
458        } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
459            Mode::ReadWrite { sync_mode: SyncMode::UtterlyNoSync }
460        } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
461            Mode::ReadWrite { sync_mode: SyncMode::NoMetaSync }
462        } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
463            Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
464        } else {
465            Mode::ReadWrite { sync_mode: SyncMode::Durable }
466        }
467    }
468}
469
470impl fmt::Debug for Environment {
471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472        f.debug_struct("Environment").field("kind", &self.inner.env_kind).finish_non_exhaustive()
473    }
474}
475
476///////////////////////////////////////////////////////////////////////////////////////////////////
477// Environment Builder
478///////////////////////////////////////////////////////////////////////////////////////////////////
479
480#[derive(Clone, Debug, PartialEq, Eq)]
481pub enum PageSize {
482    MinimalAcceptable,
483    Set(usize),
484}
485
486/// Statistics of page operations overall of all (running, completed and aborted) transactions
487#[derive(Clone, Debug, PartialEq, Eq)]
488pub struct PageOps {
489    /// Quantity of a new pages added
490    pub newly: u64,
491    /// Quantity of pages copied for update
492    pub cow: u64,
493    /// Quantity of parent's dirty pages clones for nested transactions
494    pub clone: u64,
495    /// Page splits
496    pub split: u64,
497    /// Page merges
498    pub merge: u64,
499    /// Quantity of spilled dirty pages
500    pub spill: u64,
501    /// Quantity of unspilled/reloaded pages
502    pub unspill: u64,
503    /// Number of explicit write operations (not a pages) to a disk
504    pub wops: u64,
505    /// Number of explicit msync/flush-to-disk operations
506    pub msync: u64,
507    /// Number of explicit fsync/flush-to-disk operations
508    pub fsync: u64,
509    /// Number of prefault write operations
510    pub prefault: u64,
511    /// Number of `mincore()` calls
512    pub mincore: u64,
513}
514
515/// Represents the geometry settings for the database environment
516#[derive(Clone, Debug, PartialEq, Eq)]
517pub struct Geometry<R> {
518    /// The size range in bytes.
519    pub size: Option<R>,
520    pub growth_step: Option<isize>,
521    pub shrink_threshold: Option<isize>,
522    pub page_size: Option<PageSize>,
523}
524
525impl<R> Default for Geometry<R> {
526    fn default() -> Self {
527        Self { size: None, growth_step: None, shrink_threshold: None, page_size: None }
528    }
529}
530
531/// Handle-Slow-Readers callback function to resolve database full/overflow issue due to a reader(s)
532/// which prevents the old data from being recycled.
533///
534/// Read transactions prevent reuse of pages freed by newer write transactions, thus the database
535/// can grow quickly. This callback will be called when there is not enough space in the database
536/// (i.e. before increasing the database size or before `MDBX_MAP_FULL` error) and thus can be
537/// used to resolve issues with a "long-lived" read transactions.
538///
539/// Depending on the arguments and needs, your implementation may wait,
540/// terminate a process or thread that is performing a long read, or perform
541/// some other action. In doing so it is important that the returned code always
542/// corresponds to the performed action.
543///
544/// # Arguments
545///
546/// * `process_id` – A process id of the reader process.
547/// * `thread_id` – A thread id of the reader thread.
548/// * `read_txn_id` – An oldest read transaction number on which stalled.
549/// * `gap` – A lag from the last committed txn.
550/// * `space` – A space that actually become available for reuse after this reader finished. The
551///   callback function can take this value into account to evaluate the impact that a long-running
552///   transaction has.
553/// * `retry` – A retry number starting from 0. If callback has returned 0 at least once, then at
554///   end of current handling loop the callback function will be called additionally with negative
555///   `retry` value to notify about the end of loop. The callback function can use this fact to
556///   implement timeout reset logic while waiting for a readers.
557///
558/// # Returns
559///
560/// A return code that determines the further actions for MDBX and must match the action which
561/// was executed by the callback:
562/// * `-2` or less – An error condition and the reader was not killed.
563/// * `-1` – The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error; MDBX
564///   should increase the database size or return `MDBX_MAP_FULL` error.
565/// * `0` – The callback solved the problem or just waited for a while, libmdbx should rescan the
566///   reader lock table and retry. This also includes a situation when corresponding transaction
567///   terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
568///   I.e. reader slot isn't needed to be cleaned from transaction.
569/// * `1` – Transaction aborted asynchronous and reader slot should be cleared immediately, i.e.
570///   read transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be
571///   called later.
572/// * `2` or greater – The reader process was terminated or killed, and MDBX should entirely reset
573///   reader registration.
574pub type HandleSlowReadersCallback = extern "C" fn(
575    env: *const ffi::MDBX_env,
576    txn: *const ffi::MDBX_txn,
577    pid: ffi::mdbx_pid_t,
578    tid: ffi::mdbx_tid_t,
579    laggard: u64,
580    gap: std::ffi::c_uint,
581    space: usize,
582    retry: std::ffi::c_int,
583) -> HandleSlowReadersReturnCode;
584
585#[derive(Debug)]
586#[repr(i32)]
587pub enum HandleSlowReadersReturnCode {
588    /// An error condition and the reader was not killed.
589    Error = -2,
590    /// The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error;
591    /// MDBX should increase the database size or return `MDBX_MAP_FULL` error.
592    ProceedWithoutKillingReader = -1,
593    /// The callback solved the problem or just waited for a while, libmdbx should rescan the
594    /// reader lock table and retry. This also includes a situation when corresponding transaction
595    /// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
596    /// I.e. reader slot isn't needed to be cleaned from transaction.
597    Success = 0,
598    /// Transaction aborted asynchronous and reader slot should be cleared immediately, i.e. read
599    /// transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called
600    /// later.
601    ClearReaderSlot = 1,
602    /// The reader process was terminated or killed, and MDBX should entirely reset reader
603    /// registration.
604    ReaderProcessTerminated = 2,
605}
606
607/// Options for opening or creating an environment.
608#[derive(Debug, Clone)]
609pub struct EnvironmentBuilder {
610    flags: EnvironmentFlags,
611    max_readers: Option<u64>,
612    max_dbs: Option<u64>,
613    sync_bytes: Option<u64>,
614    sync_period: Option<u64>,
615    rp_augment_limit: Option<u64>,
616    loose_limit: Option<u64>,
617    dp_reserve_limit: Option<u64>,
618    txn_dp_limit: Option<u64>,
619    spill_max_denominator: Option<u64>,
620    spill_min_denominator: Option<u64>,
621    geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
622    log_level: Option<ffi::MDBX_log_level_t>,
623    kind: EnvironmentKind,
624    handle_slow_readers: Option<HandleSlowReadersCallback>,
625    #[cfg(feature = "read-tx-timeouts")]
626    /// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
627    /// enabled, the default value of [`DEFAULT_MAX_READ_TRANSACTION_DURATION`] is used.
628    max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
629}
630
631impl EnvironmentBuilder {
632    /// Open an environment.
633    ///
634    /// Database files will be opened with 644 permissions.
635    pub fn open(&self, path: &Path) -> Result<Environment> {
636        self.open_with_permissions(path, 0o644)
637    }
638
639    /// Open an environment with the provided UNIX permissions.
640    ///
641    /// The path may not contain the null character.
642    pub fn open_with_permissions(
643        &self,
644        path: &Path,
645        mode: ffi::mdbx_mode_t,
646    ) -> Result<Environment> {
647        let mut env: *mut ffi::MDBX_env = ptr::null_mut();
648        unsafe {
649            if let Some(log_level) = self.log_level {
650                // Returns the previously debug_flags in the 0-15 bits and log_level in the
651                // 16-31 bits, no need to use `mdbx_result`.
652                ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
653            }
654
655            mdbx_result(ffi::mdbx_env_create(&mut env))?;
656
657            if let Err(e) = (|| {
658                if let Some(geometry) = &self.geometry {
659                    let mut min_size = -1;
660                    let mut max_size = -1;
661
662                    if let Some(size) = geometry.size {
663                        if let Some(size) = size.0 {
664                            min_size = size as isize;
665                        }
666
667                        if let Some(size) = size.1 {
668                            max_size = size as isize;
669                        }
670                    }
671
672                    mdbx_result(ffi::mdbx_env_set_geometry(
673                        env,
674                        min_size,
675                        -1,
676                        max_size,
677                        geometry.growth_step.unwrap_or(-1),
678                        geometry.shrink_threshold.unwrap_or(-1),
679                        match geometry.page_size {
680                            None => -1,
681                            Some(PageSize::MinimalAcceptable) => 0,
682                            Some(PageSize::Set(size)) => size as isize,
683                        },
684                    ))?;
685                }
686                for (opt, v) in [
687                    (ffi::MDBX_opt_max_db, self.max_dbs),
688                    (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
689                    (ffi::MDBX_opt_loose_limit, self.loose_limit),
690                    (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
691                    (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
692                    (ffi::MDBX_opt_spill_max_denominator, self.spill_max_denominator),
693                    (ffi::MDBX_opt_spill_min_denominator, self.spill_min_denominator),
694                ] {
695                    if let Some(v) = v {
696                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
697                    }
698                }
699
700                // set max readers if specified
701                if let Some(max_readers) = self.max_readers {
702                    mdbx_result(ffi::mdbx_env_set_option(
703                        env,
704                        ffi::MDBX_opt_max_readers,
705                        max_readers,
706                    ))?;
707                }
708
709                if let Some(handle_slow_readers) = self.handle_slow_readers {
710                    mdbx_result(ffi::mdbx_env_set_hsr(
711                        env,
712                        convert_hsr_fn(Some(handle_slow_readers)),
713                    ))?;
714                }
715
716                #[cfg(unix)]
717                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
718                    use std::os::unix::ffi::OsStrExt;
719                    path.as_ref().as_os_str().as_bytes().to_vec()
720                }
721
722                #[cfg(windows)]
723                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
724                    // On Windows, could use std::os::windows::ffi::OsStrExt to encode_wide(),
725                    // but we end up with a Vec<u16> instead of a Vec<u8>, so that doesn't
726                    // really help.
727                    path.as_ref().to_string_lossy().to_string().into_bytes()
728                }
729
730                let path = match CString::new(path_to_bytes(path)) {
731                    Ok(path) => path,
732                    Err(_) => return Err(Error::Invalid),
733                };
734                mdbx_result(ffi::mdbx_env_open(
735                    env,
736                    path.as_ptr(),
737                    self.flags.make_flags() | self.kind.extra_flags(),
738                    mode,
739                ))?;
740
741                for (opt, v) in [
742                    (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
743                    (ffi::MDBX_opt_sync_period, self.sync_period),
744                ] {
745                    if let Some(v) = v {
746                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
747                    }
748                }
749
750                Ok(())
751            })() {
752                ffi::mdbx_env_close_ex(env, false);
753
754                return Err(e)
755            }
756        }
757
758        let env_ptr = EnvPtr(env);
759
760        #[cfg(not(feature = "read-tx-timeouts"))]
761        let txn_manager = TxnManager::new(env_ptr);
762
763        #[cfg(feature = "read-tx-timeouts")]
764        let txn_manager = {
765            if let crate::MaxReadTransactionDuration::Set(duration) = self
766                .max_read_transaction_duration
767                .unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
768                    DEFAULT_MAX_READ_TRANSACTION_DURATION,
769                ))
770            {
771                TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
772            } else {
773                TxnManager::new(env_ptr)
774            }
775        };
776
777        let env = EnvironmentInner {
778            env,
779            txn_manager,
780            env_kind: self.kind,
781            ro_txn_pool: ReadTxnPool::new(),
782        };
783
784        Ok(Environment { inner: Arc::new(env) })
785    }
786
787    /// Configures how this environment will be opened.
788    pub const fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
789        self.kind = kind;
790        self
791    }
792
793    /// Opens the environment with mdbx WRITEMAP
794    ///
795    /// See also [`EnvironmentKind`]
796    pub const fn write_map(&mut self) -> &mut Self {
797        self.set_kind(EnvironmentKind::WriteMap)
798    }
799
800    /// Sets the provided options in the environment.
801    pub const fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
802        self.flags = flags;
803        self
804    }
805
806    /// Sets the maximum number of threads or reader slots for the environment.
807    ///
808    /// This defines the number of slots in the lock table that is used to track readers in the
809    /// environment. The default is 126. Starting a read-only transaction normally ties a lock
810    /// table slot to the [Transaction] object until it or the [Environment] object is destroyed.
811    pub const fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
812        self.max_readers = Some(max_readers);
813        self
814    }
815
816    /// Sets the maximum number of named databases for the environment.
817    ///
818    /// This function is only needed if multiple databases will be used in the
819    /// environment. Simpler applications that use the environment as a single
820    /// unnamed database can ignore this option.
821    ///
822    /// Currently a moderate number of slots are cheap but a huge number gets
823    /// expensive: 7-120 words per transaction, and every [`Transaction::open_db()`]
824    /// does a linear search of the opened slots.
825    pub const fn set_max_dbs(&mut self, v: usize) -> &mut Self {
826        self.max_dbs = Some(v as u64);
827        self
828    }
829
830    /// Sets the interprocess/shared threshold to force flush the data buffers to disk, if
831    /// [`SyncMode::SafeNoSync`] is used.
832    pub const fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
833        self.sync_bytes = Some(v as u64);
834        self
835    }
836
837    /// Sets the interprocess/shared relative period since the last unsteady commit to force flush
838    /// the data buffers to disk, if [`SyncMode::SafeNoSync`] is used.
839    pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
840        // For this option, mdbx uses units of 1/65536 of a second.
841        let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
842        self.sync_period = Some(as_mdbx_units);
843        self
844    }
845
846    pub const fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
847        self.rp_augment_limit = Some(v);
848        self
849    }
850
851    pub const fn set_loose_limit(&mut self, v: u64) -> &mut Self {
852        self.loose_limit = Some(v);
853        self
854    }
855
856    pub const fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
857        self.dp_reserve_limit = Some(v);
858        self
859    }
860
861    pub const fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
862        self.txn_dp_limit = Some(v);
863        self
864    }
865
866    pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
867        self.spill_max_denominator = Some(v.into());
868        self
869    }
870
871    pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
872        self.spill_min_denominator = Some(v.into());
873        self
874    }
875
876    /// Set all size-related parameters of environment, including page size and the min/max size of
877    /// the memory map.
878    pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
879        let convert_bound = |bound: Bound<&usize>| match bound {
880            Bound::Included(v) | Bound::Excluded(v) => Some(*v),
881            _ => None,
882        };
883        self.geometry = Some(Geometry {
884            size: geometry.size.map(|range| {
885                (convert_bound(range.start_bound()), convert_bound(range.end_bound()))
886            }),
887            growth_step: geometry.growth_step,
888            shrink_threshold: geometry.shrink_threshold,
889            page_size: geometry.page_size,
890        });
891        self
892    }
893
894    pub const fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
895        self.log_level = Some(log_level);
896        self
897    }
898
899    /// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more
900    /// information.
901    pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
902        self.handle_slow_readers = Some(hsr);
903        self
904    }
905}
906
907#[cfg(feature = "read-tx-timeouts")]
908pub(crate) mod read_transactions {
909    use crate::EnvironmentBuilder;
910    use std::time::Duration;
911
912    /// The maximum duration of a read transaction.
913    #[derive(Debug, Clone, Copy)]
914    #[cfg(feature = "read-tx-timeouts")]
915    pub enum MaxReadTransactionDuration {
916        /// The maximum duration of a read transaction is unbounded.
917        Unbounded,
918        /// The maximum duration of a read transaction is set to the given duration.
919        Set(Duration),
920    }
921
922    #[cfg(feature = "read-tx-timeouts")]
923    impl MaxReadTransactionDuration {
924        pub const fn as_duration(&self) -> Option<Duration> {
925            match self {
926                Self::Unbounded => None,
927                Self::Set(duration) => Some(*duration),
928            }
929        }
930    }
931
932    impl EnvironmentBuilder {
933        /// Set the maximum time a read-only transaction can be open.
934        pub const fn set_max_read_transaction_duration(
935            &mut self,
936            max_read_transaction_duration: MaxReadTransactionDuration,
937        ) -> &mut Self {
938            self.max_read_transaction_duration = Some(max_read_transaction_duration);
939            self
940        }
941    }
942}
943
944/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer.
945fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
946    unsafe { std::mem::transmute(callback) }
947}
948
949#[cfg(test)]
950mod tests {
951    use crate::{Environment, Error, Geometry, HandleSlowReadersReturnCode, PageSize, WriteFlags};
952    use std::{
953        ops::RangeInclusive,
954        sync::atomic::{AtomicBool, Ordering},
955    };
956
957    #[test]
958    fn test_handle_slow_readers_callback() {
959        static CALLED: AtomicBool = AtomicBool::new(false);
960
961        extern "C" fn handle_slow_readers(
962            _env: *const ffi::MDBX_env,
963            _txn: *const ffi::MDBX_txn,
964            _pid: ffi::mdbx_pid_t,
965            _tid: ffi::mdbx_tid_t,
966            _laggard: u64,
967            _gap: std::ffi::c_uint,
968            _space: usize,
969            _retry: std::ffi::c_int,
970        ) -> HandleSlowReadersReturnCode {
971            CALLED.store(true, Ordering::Relaxed);
972            HandleSlowReadersReturnCode::ProceedWithoutKillingReader
973        }
974
975        let tempdir = tempfile::tempdir().unwrap();
976        let env = Environment::builder()
977            .set_geometry(Geometry::<RangeInclusive<usize>> {
978                size: Some(0..=1024 * 1024), // Max 1MB, so we can hit the limit
979                page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
980                ..Default::default()
981            })
982            .set_handle_slow_readers(handle_slow_readers)
983            .open(tempdir.path())
984            .unwrap();
985
986        // Insert some data in the database, so the read transaction can lock on the snapshot of it
987        {
988            let tx = env.begin_rw_txn().unwrap();
989            let db = tx.open_db(None).unwrap();
990            for i in 0usize..1_000 {
991                tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
992            }
993            tx.commit().unwrap();
994        }
995
996        // Create a read transaction
997        let _tx_ro = env.begin_ro_txn().unwrap();
998
999        // Change previously inserted data, so the read transaction would use the previous snapshot
1000        {
1001            let tx = env.begin_rw_txn().unwrap();
1002            let db = tx.open_db(None).unwrap();
1003            for i in 0usize..1_000 {
1004                tx.put(db.dbi(), i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
1005            }
1006            tx.commit().unwrap();
1007        }
1008
1009        // Insert more data in the database, so we hit the DB size limit error, and MDBX tries to
1010        // kick long-lived readers and delete their snapshots
1011        {
1012            let tx = env.begin_rw_txn().unwrap();
1013            let db = tx.open_db(None).unwrap();
1014            for i in 1_000usize..1_000_000 {
1015                match tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()) {
1016                    Ok(_) => {}
1017                    Err(Error::MapFull) => break,
1018                    result @ Err(_) => result.unwrap(),
1019                }
1020            }
1021            // The transaction may be in an error state after hitting MapFull,
1022            // so commit could fail. We don't care about the result here since
1023            // the purpose of this test is to verify the HSR callback was called.
1024            let _ = tx.commit();
1025        }
1026
1027        // Expect the HSR to be called
1028        assert!(CALLED.load(Ordering::Relaxed));
1029    }
1030}