Skip to main content

reth_libmdbx/
environment.rs

1use crate::{
2    database::Database,
3    error::{mdbx_result, Error, Result},
4    flags::EnvironmentFlags,
5    transaction::{RO, RW},
6    txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
7    Mode, SyncMode, Transaction, TransactionKind,
8};
9use byteorder::{ByteOrder, NativeEndian};
10use mem::size_of;
11use std::{
12    ffi::CString,
13    fmt::{self, Debug},
14    mem,
15    ops::{Bound, RangeBounds},
16    path::Path,
17    ptr,
18    sync::{mpsc::sync_channel, Arc},
19    thread::sleep,
20    time::Duration,
21};
22use tracing::warn;
23
24/// The default maximum duration of a read transaction.
25#[cfg(feature = "read-tx-timeouts")]
26const DEFAULT_MAX_READ_TRANSACTION_DURATION: Duration = Duration::from_secs(5 * 60);
27
28/// An environment supports multiple databases, all residing in the same shared-memory map.
29///
30/// Accessing the environment is thread-safe.
31/// The environment will be closed when the last instance of this type is dropped.
32#[derive(Clone)]
33pub struct Environment {
34    inner: Arc<EnvironmentInner>,
35}
36
37impl Environment {
38    /// Creates a new builder for specifying options for opening an MDBX environment.
39    pub fn builder() -> EnvironmentBuilder {
40        EnvironmentBuilder {
41            flags: EnvironmentFlags::default(),
42            max_readers: None,
43            max_dbs: None,
44            sync_bytes: None,
45            sync_period: None,
46            rp_augment_limit: None,
47            loose_limit: None,
48            dp_reserve_limit: None,
49            txn_dp_limit: None,
50            spill_max_denominator: None,
51            spill_min_denominator: None,
52            geometry: None,
53            log_level: None,
54            kind: Default::default(),
55            handle_slow_readers: None,
56            #[cfg(feature = "read-tx-timeouts")]
57            max_read_transaction_duration: None,
58        }
59    }
60
61    /// Returns true if the environment was opened as WRITEMAP.
62    #[inline]
63    pub fn is_write_map(&self) -> bool {
64        self.inner.env_kind.is_write_map()
65    }
66
67    /// Returns the kind of the environment.
68    #[inline]
69    pub fn env_kind(&self) -> EnvironmentKind {
70        self.inner.env_kind
71    }
72
73    /// Returns true if the environment was opened in [`crate::Mode::ReadWrite`] mode.
74    #[inline]
75    pub fn is_read_write(&self) -> Result<bool> {
76        Ok(!self.is_read_only()?)
77    }
78
79    /// Returns true if the environment was opened in [`crate::Mode::ReadOnly`] mode.
80    #[inline]
81    pub fn is_read_only(&self) -> Result<bool> {
82        Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
83    }
84
85    /// Returns the transaction manager.
86    #[inline]
87    pub(crate) fn txn_manager(&self) -> &TxnManager {
88        &self.inner.txn_manager
89    }
90
91    /// Returns the number of timed out transactions that were not aborted by the user yet.
92    #[cfg(feature = "read-tx-timeouts")]
93    pub fn timed_out_not_aborted_transactions(&self) -> usize {
94        self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
95    }
96
97    /// Create a read-only transaction for use with the environment.
98    #[inline]
99    pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
100        Transaction::new(self.clone())
101    }
102
103    /// Create a read-write transaction for use with the environment. This method will block while
104    /// there are any other read-write transactions open on the environment.
105    pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
106        let mut warned = false;
107        let txn = loop {
108            let (tx, rx) = sync_channel(0);
109            self.txn_manager().send_message(TxnManagerMessage::Begin {
110                parent: TxnPtr(ptr::null_mut()),
111                flags: RW::OPEN_FLAGS,
112                sender: tx,
113            });
114            let res = rx.recv().unwrap();
115            if matches!(&res, Err(Error::Busy)) {
116                if !warned {
117                    warned = true;
118                    warn!(target: "libmdbx", "Process stalled, awaiting read-write transaction lock.");
119                }
120                sleep(Duration::from_millis(250));
121                continue
122            }
123
124            break res
125        }?;
126        Ok(Transaction::new_from_ptr(self.clone(), txn.0))
127    }
128
129    /// Returns a raw pointer to the underlying MDBX environment.
130    ///
131    /// The caller **must** ensure that the pointer is never dereferenced after the environment has
132    /// been dropped.
133    #[inline]
134    pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
135        self.inner.env
136    }
137
138    /// Executes the given closure once
139    ///
140    /// This is only intended to be used when accessing mdbx ffi functions directly is required.
141    ///
142    /// The caller **must** ensure that the pointer is only used within the closure.
143    #[inline]
144    #[doc(hidden)]
145    pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
146    where
147        F: FnOnce(*mut ffi::MDBX_env) -> T,
148    {
149        f(self.env_ptr())
150    }
151
152    /// Flush the environment data buffers to disk.
153    pub fn sync(&self, force: bool) -> Result<bool> {
154        mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
155    }
156
157    /// Retrieves statistics about this environment.
158    pub fn stat(&self) -> Result<Stat> {
159        unsafe {
160            let mut stat = Stat::new();
161            mdbx_result(ffi::mdbx_env_stat_ex(
162                self.env_ptr(),
163                ptr::null(),
164                stat.mdb_stat(),
165                size_of::<Stat>(),
166            ))?;
167            Ok(stat)
168        }
169    }
170
171    /// Retrieves info about this environment.
172    pub fn info(&self) -> Result<Info> {
173        unsafe {
174            let mut info = Info(mem::zeroed());
175            mdbx_result(ffi::mdbx_env_info_ex(
176                self.env_ptr(),
177                ptr::null(),
178                &mut info.0,
179                size_of::<Info>(),
180            ))?;
181            Ok(info)
182        }
183    }
184
185    /// Retrieves the total number of pages on the freelist.
186    ///
187    /// Along with [`Environment::info()`], this can be used to calculate the exact number
188    /// of used pages as well as free pages in this environment.
189    ///
190    /// ```
191    /// # use reth_libmdbx::Environment;
192    /// let dir = tempfile::tempdir().unwrap();
193    /// let env = Environment::builder().open(dir.path()).unwrap();
194    /// let info = env.info().unwrap();
195    /// let stat = env.stat().unwrap();
196    /// let freelist = env.freelist().unwrap();
197    /// let last_pgno = info.last_pgno() + 1; // pgno is 0 based.
198    /// let total_pgs = info.map_size() / stat.page_size() as usize;
199    /// let pgs_in_use = last_pgno - freelist;
200    /// let pgs_free = total_pgs - pgs_in_use;
201    /// ```
202    ///
203    /// Note:
204    ///
205    /// * MDBX stores all the freelists in the designated database 0 in each environment, and the
206    ///   freelist count is stored at the beginning of the value as `uint32_t` in the native byte
207    ///   order.
208    ///
209    /// * It will create a read transaction to traverse the freelist database.
210    pub fn freelist(&self) -> Result<usize> {
211        let mut freelist: usize = 0;
212        let txn = self.begin_ro_txn()?;
213        let db = Database::freelist_db();
214        let cursor = txn.cursor(db.dbi())?;
215
216        for result in cursor.iter_slices() {
217            let (_key, value) = result?;
218            if value.len() < size_of::<u32>() {
219                return Err(Error::Corrupted)
220            }
221            let s = &value[..size_of::<u32>()];
222            freelist += NativeEndian::read_u32(s) as usize;
223        }
224
225        Ok(freelist)
226    }
227}
228
229/// Container type for Environment internals.
230///
231/// This holds the raw pointer to the MDBX environment and the transaction manager.
232/// The env is opened via [`mdbx_env_create`](ffi::mdbx_env_create) and closed when this type drops.
233struct EnvironmentInner {
234    /// The raw pointer to the MDBX environment.
235    ///
236    /// Accessing the environment is thread-safe as long as long as this type exists.
237    env: *mut ffi::MDBX_env,
238    /// Whether the environment was opened as WRITEMAP.
239    env_kind: EnvironmentKind,
240    /// Transaction manager
241    txn_manager: TxnManager,
242}
243
244impl Drop for EnvironmentInner {
245    fn drop(&mut self) {
246        // Close open mdbx environment on drop
247        unsafe {
248            ffi::mdbx_env_close_ex(self.env, false);
249        }
250    }
251}
252
253// SAFETY: internal type, only used inside [Environment]. Accessing the environment pointer is
254// thread-safe
255unsafe impl Send for EnvironmentInner {}
256unsafe impl Sync for EnvironmentInner {}
257
258/// Determines how data is mapped into memory
259///
260/// It only takes effect when the environment is opened.
261#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
262pub enum EnvironmentKind {
263    /// Open the environment in default mode, without WRITEMAP.
264    #[default]
265    Default,
266    /// Open the environment as mdbx-WRITEMAP.
267    /// Use a writeable memory map unless the environment is opened as `MDBX_RDONLY`
268    /// ([`crate::Mode::ReadOnly`]).
269    ///
270    /// All data will be mapped into memory in the read-write mode [`crate::Mode::ReadWrite`]. This
271    /// offers a significant performance benefit, since the data will be modified directly in
272    /// mapped memory and then flushed to disk by single system call, without any memory
273    /// management nor copying.
274    ///
275    /// This mode is incompatible with nested transactions.
276    WriteMap,
277}
278
279impl EnvironmentKind {
280    /// Returns true if the environment was opened as WRITEMAP.
281    #[inline]
282    pub const fn is_write_map(&self) -> bool {
283        matches!(self, Self::WriteMap)
284    }
285
286    /// Additional flags required when opening the environment.
287    pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
288        match self {
289            Self::Default => ffi::MDBX_ENV_DEFAULTS,
290            Self::WriteMap => ffi::MDBX_WRITEMAP,
291        }
292    }
293}
294
295#[derive(Copy, Clone, Debug)]
296pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
297unsafe impl Send for EnvPtr {}
298unsafe impl Sync for EnvPtr {}
299
300/// Environment statistics.
301///
302/// Contains information about the size and layout of an MDBX environment or database.
303#[derive(Debug)]
304#[repr(transparent)]
305pub struct Stat(ffi::MDBX_stat);
306
307impl Stat {
308    /// Create a new Stat with zero'd inner struct `ffi::MDB_stat`.
309    pub(crate) const fn new() -> Self {
310        unsafe { Self(mem::zeroed()) }
311    }
312
313    /// Returns a mut pointer to `ffi::MDB_stat`.
314    pub(crate) const fn mdb_stat(&mut self) -> *mut ffi::MDBX_stat {
315        &mut self.0
316    }
317}
318
319impl Stat {
320    /// Size of a database page. This is the same for all databases in the environment.
321    #[inline]
322    pub const fn page_size(&self) -> u32 {
323        self.0.ms_psize
324    }
325
326    /// Depth (height) of the B-tree.
327    #[inline]
328    pub const fn depth(&self) -> u32 {
329        self.0.ms_depth
330    }
331
332    /// Number of internal (non-leaf) pages.
333    #[inline]
334    pub const fn branch_pages(&self) -> usize {
335        self.0.ms_branch_pages as usize
336    }
337
338    /// Number of leaf pages.
339    #[inline]
340    pub const fn leaf_pages(&self) -> usize {
341        self.0.ms_leaf_pages as usize
342    }
343
344    /// Number of overflow pages.
345    #[inline]
346    pub const fn overflow_pages(&self) -> usize {
347        self.0.ms_overflow_pages as usize
348    }
349
350    /// Number of data items.
351    #[inline]
352    pub const fn entries(&self) -> usize {
353        self.0.ms_entries as usize
354    }
355}
356
357#[derive(Debug)]
358#[repr(transparent)]
359pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
360
361impl GeometryInfo {
362    pub const fn min(&self) -> u64 {
363        self.0.lower
364    }
365}
366
367/// Environment information.
368///
369/// Contains environment information about the map size, readers, last txn id etc.
370#[derive(Debug)]
371#[repr(transparent)]
372pub struct Info(ffi::MDBX_envinfo);
373
374impl Info {
375    pub const fn geometry(&self) -> GeometryInfo {
376        GeometryInfo(self.0.mi_geo)
377    }
378
379    /// Size of memory map.
380    #[inline]
381    pub const fn map_size(&self) -> usize {
382        self.0.mi_mapsize as usize
383    }
384
385    /// Last used page number
386    #[inline]
387    pub const fn last_pgno(&self) -> usize {
388        self.0.mi_last_pgno as usize
389    }
390
391    /// Last transaction ID
392    #[inline]
393    pub const fn last_txnid(&self) -> usize {
394        self.0.mi_recent_txnid as usize
395    }
396
397    /// Max reader slots in the environment
398    #[inline]
399    pub const fn max_readers(&self) -> usize {
400        self.0.mi_maxreaders as usize
401    }
402
403    /// Max reader slots used in the environment
404    #[inline]
405    pub const fn num_readers(&self) -> usize {
406        self.0.mi_numreaders as usize
407    }
408
409    /// Return the internal page ops metrics
410    #[inline]
411    pub const fn page_ops(&self) -> PageOps {
412        PageOps {
413            newly: self.0.mi_pgop_stat.newly,
414            cow: self.0.mi_pgop_stat.cow,
415            clone: self.0.mi_pgop_stat.clone,
416            split: self.0.mi_pgop_stat.split,
417            merge: self.0.mi_pgop_stat.merge,
418            spill: self.0.mi_pgop_stat.spill,
419            unspill: self.0.mi_pgop_stat.unspill,
420            wops: self.0.mi_pgop_stat.wops,
421            prefault: self.0.mi_pgop_stat.prefault,
422            mincore: self.0.mi_pgop_stat.mincore,
423            msync: self.0.mi_pgop_stat.msync,
424            fsync: self.0.mi_pgop_stat.fsync,
425        }
426    }
427
428    /// Return the mode of the database
429    #[inline]
430    pub const fn mode(&self) -> Mode {
431        let mode = self.0.mi_mode as ffi::MDBX_env_flags_t;
432        if (mode & ffi::MDBX_RDONLY) != 0 {
433            Mode::ReadOnly
434        } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
435            Mode::ReadWrite { sync_mode: SyncMode::UtterlyNoSync }
436        } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
437            Mode::ReadWrite { sync_mode: SyncMode::NoMetaSync }
438        } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
439            Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
440        } else {
441            Mode::ReadWrite { sync_mode: SyncMode::Durable }
442        }
443    }
444}
445
446impl fmt::Debug for Environment {
447    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448        f.debug_struct("Environment").field("kind", &self.inner.env_kind).finish_non_exhaustive()
449    }
450}
451
452///////////////////////////////////////////////////////////////////////////////////////////////////
453// Environment Builder
454///////////////////////////////////////////////////////////////////////////////////////////////////
455
456#[derive(Clone, Debug, PartialEq, Eq)]
457pub enum PageSize {
458    MinimalAcceptable,
459    Set(usize),
460}
461
462/// Statistics of page operations overall of all (running, completed and aborted) transactions
463#[derive(Clone, Debug, PartialEq, Eq)]
464pub struct PageOps {
465    /// Quantity of a new pages added
466    pub newly: u64,
467    /// Quantity of pages copied for update
468    pub cow: u64,
469    /// Quantity of parent's dirty pages clones for nested transactions
470    pub clone: u64,
471    /// Page splits
472    pub split: u64,
473    /// Page merges
474    pub merge: u64,
475    /// Quantity of spilled dirty pages
476    pub spill: u64,
477    /// Quantity of unspilled/reloaded pages
478    pub unspill: u64,
479    /// Number of explicit write operations (not a pages) to a disk
480    pub wops: u64,
481    /// Number of explicit msync/flush-to-disk operations
482    pub msync: u64,
483    /// Number of explicit fsync/flush-to-disk operations
484    pub fsync: u64,
485    /// Number of prefault write operations
486    pub prefault: u64,
487    /// Number of `mincore()` calls
488    pub mincore: u64,
489}
490
491/// Represents the geometry settings for the database environment
492#[derive(Clone, Debug, PartialEq, Eq)]
493pub struct Geometry<R> {
494    /// The size range in bytes.
495    pub size: Option<R>,
496    pub growth_step: Option<isize>,
497    pub shrink_threshold: Option<isize>,
498    pub page_size: Option<PageSize>,
499}
500
501impl<R> Default for Geometry<R> {
502    fn default() -> Self {
503        Self { size: None, growth_step: None, shrink_threshold: None, page_size: None }
504    }
505}
506
507/// Handle-Slow-Readers callback function to resolve database full/overflow issue due to a reader(s)
508/// which prevents the old data from being recycled.
509///
510/// Read transactions prevent reuse of pages freed by newer write transactions, thus the database
511/// can grow quickly. This callback will be called when there is not enough space in the database
512/// (i.e. before increasing the database size or before `MDBX_MAP_FULL` error) and thus can be
513/// used to resolve issues with a "long-lived" read transactions.
514///
515/// Depending on the arguments and needs, your implementation may wait,
516/// terminate a process or thread that is performing a long read, or perform
517/// some other action. In doing so it is important that the returned code always
518/// corresponds to the performed action.
519///
520/// # Arguments
521///
522/// * `process_id` – A process id of the reader process.
523/// * `thread_id` – A thread id of the reader thread.
524/// * `read_txn_id` – An oldest read transaction number on which stalled.
525/// * `gap` – A lag from the last committed txn.
526/// * `space` – A space that actually become available for reuse after this reader finished. The
527///   callback function can take this value into account to evaluate the impact that a long-running
528///   transaction has.
529/// * `retry` – A retry number starting from 0. If callback has returned 0 at least once, then at
530///   end of current handling loop the callback function will be called additionally with negative
531///   `retry` value to notify about the end of loop. The callback function can use this fact to
532///   implement timeout reset logic while waiting for a readers.
533///
534/// # Returns
535///
536/// A return code that determines the further actions for MDBX and must match the action which
537/// was executed by the callback:
538/// * `-2` or less – An error condition and the reader was not killed.
539/// * `-1` – The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error; MDBX
540///   should increase the database size or return `MDBX_MAP_FULL` error.
541/// * `0` – The callback solved the problem or just waited for a while, libmdbx should rescan the
542///   reader lock table and retry. This also includes a situation when corresponding transaction
543///   terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
544///   I.e. reader slot isn't needed to be cleaned from transaction.
545/// * `1` – Transaction aborted asynchronous and reader slot should be cleared immediately, i.e.
546///   read transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be
547///   called later.
548/// * `2` or greater – The reader process was terminated or killed, and MDBX should entirely reset
549///   reader registration.
550pub type HandleSlowReadersCallback = extern "C" fn(
551    env: *const ffi::MDBX_env,
552    txn: *const ffi::MDBX_txn,
553    pid: ffi::mdbx_pid_t,
554    tid: ffi::mdbx_tid_t,
555    laggard: u64,
556    gap: std::ffi::c_uint,
557    space: usize,
558    retry: std::ffi::c_int,
559) -> HandleSlowReadersReturnCode;
560
561#[derive(Debug)]
562#[repr(i32)]
563pub enum HandleSlowReadersReturnCode {
564    /// An error condition and the reader was not killed.
565    Error = -2,
566    /// The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error;
567    /// MDBX should increase the database size or return `MDBX_MAP_FULL` error.
568    ProceedWithoutKillingReader = -1,
569    /// The callback solved the problem or just waited for a while, libmdbx should rescan the
570    /// reader lock table and retry. This also includes a situation when corresponding transaction
571    /// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
572    /// I.e. reader slot isn't needed to be cleaned from transaction.
573    Success = 0,
574    /// Transaction aborted asynchronous and reader slot should be cleared immediately, i.e. read
575    /// transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called
576    /// later.
577    ClearReaderSlot = 1,
578    /// The reader process was terminated or killed, and MDBX should entirely reset reader
579    /// registration.
580    ReaderProcessTerminated = 2,
581}
582
583/// Options for opening or creating an environment.
584#[derive(Debug, Clone)]
585pub struct EnvironmentBuilder {
586    flags: EnvironmentFlags,
587    max_readers: Option<u64>,
588    max_dbs: Option<u64>,
589    sync_bytes: Option<u64>,
590    sync_period: Option<u64>,
591    rp_augment_limit: Option<u64>,
592    loose_limit: Option<u64>,
593    dp_reserve_limit: Option<u64>,
594    txn_dp_limit: Option<u64>,
595    spill_max_denominator: Option<u64>,
596    spill_min_denominator: Option<u64>,
597    geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
598    log_level: Option<ffi::MDBX_log_level_t>,
599    kind: EnvironmentKind,
600    handle_slow_readers: Option<HandleSlowReadersCallback>,
601    #[cfg(feature = "read-tx-timeouts")]
602    /// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
603    /// enabled, the default value of [`DEFAULT_MAX_READ_TRANSACTION_DURATION`] is used.
604    max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
605}
606
607impl EnvironmentBuilder {
608    /// Open an environment.
609    ///
610    /// Database files will be opened with 644 permissions.
611    pub fn open(&self, path: &Path) -> Result<Environment> {
612        self.open_with_permissions(path, 0o644)
613    }
614
615    /// Open an environment with the provided UNIX permissions.
616    ///
617    /// The path may not contain the null character.
618    pub fn open_with_permissions(
619        &self,
620        path: &Path,
621        mode: ffi::mdbx_mode_t,
622    ) -> Result<Environment> {
623        let mut env: *mut ffi::MDBX_env = ptr::null_mut();
624        unsafe {
625            if let Some(log_level) = self.log_level {
626                // Returns the previously debug_flags in the 0-15 bits and log_level in the
627                // 16-31 bits, no need to use `mdbx_result`.
628                ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
629            }
630
631            mdbx_result(ffi::mdbx_env_create(&mut env))?;
632
633            if let Err(e) = (|| {
634                if let Some(geometry) = &self.geometry {
635                    let mut min_size = -1;
636                    let mut max_size = -1;
637
638                    if let Some(size) = geometry.size {
639                        if let Some(size) = size.0 {
640                            min_size = size as isize;
641                        }
642
643                        if let Some(size) = size.1 {
644                            max_size = size as isize;
645                        }
646                    }
647
648                    mdbx_result(ffi::mdbx_env_set_geometry(
649                        env,
650                        min_size,
651                        -1,
652                        max_size,
653                        geometry.growth_step.unwrap_or(-1),
654                        geometry.shrink_threshold.unwrap_or(-1),
655                        match geometry.page_size {
656                            None => -1,
657                            Some(PageSize::MinimalAcceptable) => 0,
658                            Some(PageSize::Set(size)) => size as isize,
659                        },
660                    ))?;
661                }
662                for (opt, v) in [
663                    (ffi::MDBX_opt_max_db, self.max_dbs),
664                    (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
665                    (ffi::MDBX_opt_loose_limit, self.loose_limit),
666                    (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
667                    (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
668                    (ffi::MDBX_opt_spill_max_denominator, self.spill_max_denominator),
669                    (ffi::MDBX_opt_spill_min_denominator, self.spill_min_denominator),
670                ] {
671                    if let Some(v) = v {
672                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
673                    }
674                }
675
676                // set max readers if specified
677                if let Some(max_readers) = self.max_readers {
678                    mdbx_result(ffi::mdbx_env_set_option(
679                        env,
680                        ffi::MDBX_opt_max_readers,
681                        max_readers,
682                    ))?;
683                }
684
685                if let Some(handle_slow_readers) = self.handle_slow_readers {
686                    mdbx_result(ffi::mdbx_env_set_hsr(
687                        env,
688                        convert_hsr_fn(Some(handle_slow_readers)),
689                    ))?;
690                }
691
692                #[cfg(unix)]
693                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
694                    use std::os::unix::ffi::OsStrExt;
695                    path.as_ref().as_os_str().as_bytes().to_vec()
696                }
697
698                #[cfg(windows)]
699                fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
700                    // On Windows, could use std::os::windows::ffi::OsStrExt to encode_wide(),
701                    // but we end up with a Vec<u16> instead of a Vec<u8>, so that doesn't
702                    // really help.
703                    path.as_ref().to_string_lossy().to_string().into_bytes()
704                }
705
706                let path = match CString::new(path_to_bytes(path)) {
707                    Ok(path) => path,
708                    Err(_) => return Err(Error::Invalid),
709                };
710                mdbx_result(ffi::mdbx_env_open(
711                    env,
712                    path.as_ptr(),
713                    self.flags.make_flags() | self.kind.extra_flags(),
714                    mode,
715                ))?;
716
717                for (opt, v) in [
718                    (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
719                    (ffi::MDBX_opt_sync_period, self.sync_period),
720                ] {
721                    if let Some(v) = v {
722                        mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
723                    }
724                }
725
726                Ok(())
727            })() {
728                ffi::mdbx_env_close_ex(env, false);
729
730                return Err(e)
731            }
732        }
733
734        let env_ptr = EnvPtr(env);
735
736        #[cfg(not(feature = "read-tx-timeouts"))]
737        let txn_manager = TxnManager::new(env_ptr);
738
739        #[cfg(feature = "read-tx-timeouts")]
740        let txn_manager = {
741            if let crate::MaxReadTransactionDuration::Set(duration) = self
742                .max_read_transaction_duration
743                .unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
744                    DEFAULT_MAX_READ_TRANSACTION_DURATION,
745                ))
746            {
747                TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
748            } else {
749                TxnManager::new(env_ptr)
750            }
751        };
752
753        let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
754
755        Ok(Environment { inner: Arc::new(env) })
756    }
757
758    /// Configures how this environment will be opened.
759    pub const fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
760        self.kind = kind;
761        self
762    }
763
764    /// Opens the environment with mdbx WRITEMAP
765    ///
766    /// See also [`EnvironmentKind`]
767    pub const fn write_map(&mut self) -> &mut Self {
768        self.set_kind(EnvironmentKind::WriteMap)
769    }
770
771    /// Sets the provided options in the environment.
772    pub const fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
773        self.flags = flags;
774        self
775    }
776
777    /// Sets the maximum number of threads or reader slots for the environment.
778    ///
779    /// This defines the number of slots in the lock table that is used to track readers in the
780    /// environment. The default is 126. Starting a read-only transaction normally ties a lock
781    /// table slot to the [Transaction] object until it or the [Environment] object is destroyed.
782    pub const fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
783        self.max_readers = Some(max_readers);
784        self
785    }
786
787    /// Sets the maximum number of named databases for the environment.
788    ///
789    /// This function is only needed if multiple databases will be used in the
790    /// environment. Simpler applications that use the environment as a single
791    /// unnamed database can ignore this option.
792    ///
793    /// Currently a moderate number of slots are cheap but a huge number gets
794    /// expensive: 7-120 words per transaction, and every [`Transaction::open_db()`]
795    /// does a linear search of the opened slots.
796    pub const fn set_max_dbs(&mut self, v: usize) -> &mut Self {
797        self.max_dbs = Some(v as u64);
798        self
799    }
800
801    /// Sets the interprocess/shared threshold to force flush the data buffers to disk, if
802    /// [`SyncMode::SafeNoSync`] is used.
803    pub const fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
804        self.sync_bytes = Some(v as u64);
805        self
806    }
807
808    /// Sets the interprocess/shared relative period since the last unsteady commit to force flush
809    /// the data buffers to disk, if [`SyncMode::SafeNoSync`] is used.
810    pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
811        // For this option, mdbx uses units of 1/65536 of a second.
812        let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
813        self.sync_period = Some(as_mdbx_units);
814        self
815    }
816
817    pub const fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
818        self.rp_augment_limit = Some(v);
819        self
820    }
821
822    pub const fn set_loose_limit(&mut self, v: u64) -> &mut Self {
823        self.loose_limit = Some(v);
824        self
825    }
826
827    pub const fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
828        self.dp_reserve_limit = Some(v);
829        self
830    }
831
832    pub const fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
833        self.txn_dp_limit = Some(v);
834        self
835    }
836
837    pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
838        self.spill_max_denominator = Some(v.into());
839        self
840    }
841
842    pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
843        self.spill_min_denominator = Some(v.into());
844        self
845    }
846
847    /// Set all size-related parameters of environment, including page size and the min/max size of
848    /// the memory map.
849    pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
850        let convert_bound = |bound: Bound<&usize>| match bound {
851            Bound::Included(v) | Bound::Excluded(v) => Some(*v),
852            _ => None,
853        };
854        self.geometry = Some(Geometry {
855            size: geometry.size.map(|range| {
856                (convert_bound(range.start_bound()), convert_bound(range.end_bound()))
857            }),
858            growth_step: geometry.growth_step,
859            shrink_threshold: geometry.shrink_threshold,
860            page_size: geometry.page_size,
861        });
862        self
863    }
864
865    pub const fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
866        self.log_level = Some(log_level);
867        self
868    }
869
870    /// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more
871    /// information.
872    pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
873        self.handle_slow_readers = Some(hsr);
874        self
875    }
876}
877
878#[cfg(feature = "read-tx-timeouts")]
879pub(crate) mod read_transactions {
880    use crate::EnvironmentBuilder;
881    use std::time::Duration;
882
883    /// The maximum duration of a read transaction.
884    #[derive(Debug, Clone, Copy)]
885    #[cfg(feature = "read-tx-timeouts")]
886    pub enum MaxReadTransactionDuration {
887        /// The maximum duration of a read transaction is unbounded.
888        Unbounded,
889        /// The maximum duration of a read transaction is set to the given duration.
890        Set(Duration),
891    }
892
893    #[cfg(feature = "read-tx-timeouts")]
894    impl MaxReadTransactionDuration {
895        pub const fn as_duration(&self) -> Option<Duration> {
896            match self {
897                Self::Unbounded => None,
898                Self::Set(duration) => Some(*duration),
899            }
900        }
901    }
902
903    impl EnvironmentBuilder {
904        /// Set the maximum time a read-only transaction can be open.
905        pub const fn set_max_read_transaction_duration(
906            &mut self,
907            max_read_transaction_duration: MaxReadTransactionDuration,
908        ) -> &mut Self {
909            self.max_read_transaction_duration = Some(max_read_transaction_duration);
910            self
911        }
912    }
913}
914
915/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer.
916fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
917    unsafe { std::mem::transmute(callback) }
918}
919
920#[cfg(test)]
921mod tests {
922    use crate::{Environment, Error, Geometry, HandleSlowReadersReturnCode, PageSize, WriteFlags};
923    use std::{
924        ops::RangeInclusive,
925        sync::atomic::{AtomicBool, Ordering},
926    };
927
928    #[test]
929    fn test_handle_slow_readers_callback() {
930        static CALLED: AtomicBool = AtomicBool::new(false);
931
932        extern "C" fn handle_slow_readers(
933            _env: *const ffi::MDBX_env,
934            _txn: *const ffi::MDBX_txn,
935            _pid: ffi::mdbx_pid_t,
936            _tid: ffi::mdbx_tid_t,
937            _laggard: u64,
938            _gap: std::ffi::c_uint,
939            _space: usize,
940            _retry: std::ffi::c_int,
941        ) -> HandleSlowReadersReturnCode {
942            CALLED.store(true, Ordering::Relaxed);
943            HandleSlowReadersReturnCode::ProceedWithoutKillingReader
944        }
945
946        let tempdir = tempfile::tempdir().unwrap();
947        let env = Environment::builder()
948            .set_geometry(Geometry::<RangeInclusive<usize>> {
949                size: Some(0..=1024 * 1024), // Max 1MB, so we can hit the limit
950                page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
951                ..Default::default()
952            })
953            .set_handle_slow_readers(handle_slow_readers)
954            .open(tempdir.path())
955            .unwrap();
956
957        // Insert some data in the database, so the read transaction can lock on the snapshot of it
958        {
959            let tx = env.begin_rw_txn().unwrap();
960            let db = tx.open_db(None).unwrap();
961            for i in 0usize..1_000 {
962                tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
963            }
964            tx.commit().unwrap();
965        }
966
967        // Create a read transaction
968        let _tx_ro = env.begin_ro_txn().unwrap();
969
970        // Change previously inserted data, so the read transaction would use the previous snapshot
971        {
972            let tx = env.begin_rw_txn().unwrap();
973            let db = tx.open_db(None).unwrap();
974            for i in 0usize..1_000 {
975                tx.put(db.dbi(), i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
976            }
977            tx.commit().unwrap();
978        }
979
980        // Insert more data in the database, so we hit the DB size limit error, and MDBX tries to
981        // kick long-lived readers and delete their snapshots
982        {
983            let tx = env.begin_rw_txn().unwrap();
984            let db = tx.open_db(None).unwrap();
985            for i in 1_000usize..1_000_000 {
986                match tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()) {
987                    Ok(_) => {}
988                    Err(Error::MapFull) => break,
989                    result @ Err(_) => result.unwrap(),
990                }
991            }
992            // The transaction may be in an error state after hitting MapFull,
993            // so commit could fail. We don't care about the result here since
994            // the purpose of this test is to verify the HSR callback was called.
995            let _ = tx.commit();
996        }
997
998        // Expect the HSR to be called
999        assert!(CALLED.load(Ordering::Relaxed));
1000    }
1001}