1use crate::{
2 database::Database,
3 error::{mdbx_result, Error, Result},
4 flags::EnvironmentFlags,
5 transaction::{RO, RW},
6 txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
7 txn_pool::ReadTxnPool,
8 Mode, SyncMode, Transaction, TransactionKind,
9};
10use byteorder::{ByteOrder, NativeEndian};
11use mem::size_of;
12use std::{
13 ffi::CString,
14 fmt::{self, Debug},
15 mem,
16 ops::{Bound, RangeBounds},
17 path::Path,
18 ptr,
19 sync::{mpsc::sync_channel, Arc},
20 thread::sleep,
21 time::Duration,
22};
23use tracing::warn;
24
25#[cfg(feature = "read-tx-timeouts")]
27const DEFAULT_MAX_READ_TRANSACTION_DURATION: Duration = Duration::from_secs(5 * 60);
28
29#[derive(Clone)]
34pub struct Environment {
35 inner: Arc<EnvironmentInner>,
36}
37
38impl Environment {
39 pub fn builder() -> EnvironmentBuilder {
41 EnvironmentBuilder {
42 flags: EnvironmentFlags::default(),
43 max_readers: None,
44 max_dbs: None,
45 sync_bytes: None,
46 sync_period: None,
47 rp_augment_limit: None,
48 loose_limit: None,
49 dp_reserve_limit: None,
50 txn_dp_limit: None,
51 spill_max_denominator: None,
52 spill_min_denominator: None,
53 geometry: None,
54 log_level: None,
55 kind: Default::default(),
56 handle_slow_readers: None,
57 #[cfg(feature = "read-tx-timeouts")]
58 max_read_transaction_duration: None,
59 }
60 }
61
62 #[inline]
64 pub fn is_write_map(&self) -> bool {
65 self.inner.env_kind.is_write_map()
66 }
67
68 #[inline]
70 pub fn env_kind(&self) -> EnvironmentKind {
71 self.inner.env_kind
72 }
73
74 #[inline]
76 pub fn is_read_write(&self) -> Result<bool> {
77 Ok(!self.is_read_only()?)
78 }
79
80 #[inline]
82 pub fn is_read_only(&self) -> Result<bool> {
83 Ok(matches!(self.info()?.mode(), Mode::ReadOnly))
84 }
85
86 #[inline]
88 pub(crate) fn txn_manager(&self) -> &TxnManager {
89 &self.inner.txn_manager
90 }
91
92 #[cfg(feature = "read-tx-timeouts")]
94 pub fn timed_out_not_aborted_transactions(&self) -> usize {
95 self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
96 }
97
98 #[inline]
103 pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
104 if let Some(txn_ptr) = self.inner.ro_txn_pool.pop() {
105 return Ok(Transaction::new_from_ptr(self.clone(), txn_ptr));
106 }
107 Transaction::new(self.clone())
108 }
109
110 #[inline]
112 pub(crate) fn ro_txn_pool(&self) -> &ReadTxnPool {
113 &self.inner.ro_txn_pool
114 }
115
116 pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
119 let mut warned = false;
120 let txn = loop {
121 let (tx, rx) = sync_channel(0);
122 self.txn_manager().send_message(TxnManagerMessage::Begin {
123 parent: TxnPtr(ptr::null_mut()),
124 flags: RW::OPEN_FLAGS,
125 sender: tx,
126 });
127 let res = rx.recv().unwrap();
128 if matches!(&res, Err(Error::Busy)) {
129 if !warned {
130 warned = true;
131 warn!(target: "libmdbx", "Process stalled, awaiting read-write transaction lock.");
132 }
133 sleep(Duration::from_millis(250));
134 continue
135 }
136
137 break res
138 }?;
139 Ok(Transaction::new_from_ptr(self.clone(), txn.0))
140 }
141
142 #[inline]
147 pub(crate) fn env_ptr(&self) -> *mut ffi::MDBX_env {
148 self.inner.env
149 }
150
151 #[inline]
157 #[doc(hidden)]
158 pub fn with_raw_env_ptr<F, T>(&self, f: F) -> T
159 where
160 F: FnOnce(*mut ffi::MDBX_env) -> T,
161 {
162 f(self.env_ptr())
163 }
164
165 pub fn sync(&self, force: bool) -> Result<bool> {
167 mdbx_result(unsafe { ffi::mdbx_env_sync_ex(self.env_ptr(), force, false) })
168 }
169
170 pub fn stat(&self) -> Result<Stat> {
172 unsafe {
173 let mut stat = Stat::new();
174 mdbx_result(ffi::mdbx_env_stat_ex(
175 self.env_ptr(),
176 ptr::null(),
177 stat.mdbx_stat(),
178 size_of::<Stat>(),
179 ))?;
180 Ok(stat)
181 }
182 }
183
184 pub fn info(&self) -> Result<Info> {
186 unsafe {
187 let mut info = Info(mem::zeroed());
188 mdbx_result(ffi::mdbx_env_info_ex(
189 self.env_ptr(),
190 ptr::null(),
191 &mut info.0,
192 size_of::<Info>(),
193 ))?;
194 Ok(info)
195 }
196 }
197
198 pub fn freelist(&self) -> Result<usize> {
224 let mut freelist: usize = 0;
225 let txn = self.begin_ro_txn()?;
226 let db = Database::freelist_db();
227 let cursor = txn.cursor(db.dbi())?;
228
229 for result in cursor.iter_slices() {
230 let (_key, value) = result?;
231 if value.len() < size_of::<u32>() {
232 return Err(Error::Corrupted)
233 }
234 let s = &value[..size_of::<u32>()];
235 freelist += NativeEndian::read_u32(s) as usize;
236 }
237
238 Ok(freelist)
239 }
240}
241
242struct EnvironmentInner {
247 env: *mut ffi::MDBX_env,
251 env_kind: EnvironmentKind,
253 txn_manager: TxnManager,
255 ro_txn_pool: ReadTxnPool,
257}
258
259impl Drop for EnvironmentInner {
260 fn drop(&mut self) {
261 self.ro_txn_pool.drain();
263
264 unsafe {
266 ffi::mdbx_env_close_ex(self.env, false);
267 }
268 }
269}
270
271unsafe impl Send for EnvironmentInner {}
274unsafe impl Sync for EnvironmentInner {}
275
276#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
280pub enum EnvironmentKind {
281 #[default]
283 Default,
284 WriteMap,
295}
296
297impl EnvironmentKind {
298 #[inline]
300 pub const fn is_write_map(&self) -> bool {
301 matches!(self, Self::WriteMap)
302 }
303
304 pub(crate) const fn extra_flags(&self) -> ffi::MDBX_env_flags_t {
306 match self {
307 Self::Default => ffi::MDBX_ENV_DEFAULTS,
308 Self::WriteMap => ffi::MDBX_WRITEMAP,
309 }
310 }
311}
312
313#[derive(Copy, Clone, Debug)]
314pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
315unsafe impl Send for EnvPtr {}
316unsafe impl Sync for EnvPtr {}
317
318#[derive(Debug)]
322#[repr(transparent)]
323pub struct Stat(ffi::MDBX_stat);
324
325impl Stat {
326 pub(crate) const fn new() -> Self {
328 unsafe { Self(mem::zeroed()) }
329 }
330
331 pub(crate) const fn mdbx_stat(&mut self) -> *mut ffi::MDBX_stat {
333 &mut self.0
334 }
335}
336
337impl Stat {
338 #[inline]
340 pub const fn page_size(&self) -> u32 {
341 self.0.ms_psize
342 }
343
344 #[inline]
346 pub const fn depth(&self) -> u32 {
347 self.0.ms_depth
348 }
349
350 #[inline]
352 pub const fn branch_pages(&self) -> usize {
353 self.0.ms_branch_pages as usize
354 }
355
356 #[inline]
358 pub const fn leaf_pages(&self) -> usize {
359 self.0.ms_leaf_pages as usize
360 }
361
362 #[inline]
364 pub const fn overflow_pages(&self) -> usize {
365 self.0.ms_overflow_pages as usize
366 }
367
368 #[inline]
370 pub const fn entries(&self) -> usize {
371 self.0.ms_entries as usize
372 }
373}
374
375#[derive(Debug)]
376#[repr(transparent)]
377pub struct GeometryInfo(ffi::MDBX_envinfo__bindgen_ty_1);
378
379impl GeometryInfo {
380 pub const fn min(&self) -> u64 {
381 self.0.lower
382 }
383}
384
385#[derive(Debug)]
389#[repr(transparent)]
390pub struct Info(ffi::MDBX_envinfo);
391
392impl Info {
393 pub const fn geometry(&self) -> GeometryInfo {
394 GeometryInfo(self.0.mi_geo)
395 }
396
397 #[inline]
399 pub const fn map_size(&self) -> usize {
400 self.0.mi_mapsize as usize
401 }
402
403 #[inline]
405 pub const fn last_pgno(&self) -> usize {
406 self.0.mi_last_pgno as usize
407 }
408
409 #[inline]
411 pub const fn last_txnid(&self) -> usize {
412 self.0.mi_recent_txnid as usize
413 }
414
415 #[inline]
417 pub const fn max_readers(&self) -> usize {
418 self.0.mi_maxreaders as usize
419 }
420
421 #[inline]
423 pub const fn num_readers(&self) -> usize {
424 self.0.mi_numreaders as usize
425 }
426
427 #[inline]
429 pub const fn latter_reader_txnid(&self) -> u64 {
430 self.0.mi_latter_reader_txnid
431 }
432
433 #[inline]
435 pub const fn page_ops(&self) -> PageOps {
436 PageOps {
437 newly: self.0.mi_pgop_stat.newly,
438 cow: self.0.mi_pgop_stat.cow,
439 clone: self.0.mi_pgop_stat.clone,
440 split: self.0.mi_pgop_stat.split,
441 merge: self.0.mi_pgop_stat.merge,
442 spill: self.0.mi_pgop_stat.spill,
443 unspill: self.0.mi_pgop_stat.unspill,
444 wops: self.0.mi_pgop_stat.wops,
445 prefault: self.0.mi_pgop_stat.prefault,
446 mincore: self.0.mi_pgop_stat.mincore,
447 msync: self.0.mi_pgop_stat.msync,
448 fsync: self.0.mi_pgop_stat.fsync,
449 }
450 }
451
452 #[inline]
454 pub const fn mode(&self) -> Mode {
455 let mode = self.0.mi_mode as ffi::MDBX_env_flags_t;
456 if (mode & ffi::MDBX_RDONLY) != 0 {
457 Mode::ReadOnly
458 } else if (mode & ffi::MDBX_UTTERLY_NOSYNC) != 0 {
459 Mode::ReadWrite { sync_mode: SyncMode::UtterlyNoSync }
460 } else if (mode & ffi::MDBX_NOMETASYNC) != 0 {
461 Mode::ReadWrite { sync_mode: SyncMode::NoMetaSync }
462 } else if (mode & ffi::MDBX_SAFE_NOSYNC) != 0 {
463 Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
464 } else {
465 Mode::ReadWrite { sync_mode: SyncMode::Durable }
466 }
467 }
468}
469
470impl fmt::Debug for Environment {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 f.debug_struct("Environment").field("kind", &self.inner.env_kind).finish_non_exhaustive()
473 }
474}
475
476#[derive(Clone, Debug, PartialEq, Eq)]
481pub enum PageSize {
482 MinimalAcceptable,
483 Set(usize),
484}
485
486#[derive(Clone, Debug, PartialEq, Eq)]
488pub struct PageOps {
489 pub newly: u64,
491 pub cow: u64,
493 pub clone: u64,
495 pub split: u64,
497 pub merge: u64,
499 pub spill: u64,
501 pub unspill: u64,
503 pub wops: u64,
505 pub msync: u64,
507 pub fsync: u64,
509 pub prefault: u64,
511 pub mincore: u64,
513}
514
515#[derive(Clone, Debug, PartialEq, Eq)]
517pub struct Geometry<R> {
518 pub size: Option<R>,
520 pub growth_step: Option<isize>,
521 pub shrink_threshold: Option<isize>,
522 pub page_size: Option<PageSize>,
523}
524
525impl<R> Default for Geometry<R> {
526 fn default() -> Self {
527 Self { size: None, growth_step: None, shrink_threshold: None, page_size: None }
528 }
529}
530
531pub type HandleSlowReadersCallback = extern "C" fn(
575 env: *const ffi::MDBX_env,
576 txn: *const ffi::MDBX_txn,
577 pid: ffi::mdbx_pid_t,
578 tid: ffi::mdbx_tid_t,
579 laggard: u64,
580 gap: std::ffi::c_uint,
581 space: usize,
582 retry: std::ffi::c_int,
583) -> HandleSlowReadersReturnCode;
584
585#[derive(Debug)]
586#[repr(i32)]
587pub enum HandleSlowReadersReturnCode {
588 Error = -2,
590 ProceedWithoutKillingReader = -1,
593 Success = 0,
598 ClearReaderSlot = 1,
602 ReaderProcessTerminated = 2,
605}
606
607#[derive(Debug, Clone)]
609pub struct EnvironmentBuilder {
610 flags: EnvironmentFlags,
611 max_readers: Option<u64>,
612 max_dbs: Option<u64>,
613 sync_bytes: Option<u64>,
614 sync_period: Option<u64>,
615 rp_augment_limit: Option<u64>,
616 loose_limit: Option<u64>,
617 dp_reserve_limit: Option<u64>,
618 txn_dp_limit: Option<u64>,
619 spill_max_denominator: Option<u64>,
620 spill_min_denominator: Option<u64>,
621 geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
622 log_level: Option<ffi::MDBX_log_level_t>,
623 kind: EnvironmentKind,
624 handle_slow_readers: Option<HandleSlowReadersCallback>,
625 #[cfg(feature = "read-tx-timeouts")]
626 max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
629}
630
631impl EnvironmentBuilder {
632 pub fn open(&self, path: &Path) -> Result<Environment> {
636 self.open_with_permissions(path, 0o644)
637 }
638
639 pub fn open_with_permissions(
643 &self,
644 path: &Path,
645 mode: ffi::mdbx_mode_t,
646 ) -> Result<Environment> {
647 let mut env: *mut ffi::MDBX_env = ptr::null_mut();
648 unsafe {
649 if let Some(log_level) = self.log_level {
650 ffi::mdbx_setup_debug(log_level, ffi::MDBX_DBG_DONTCHANGE, None);
653 }
654
655 mdbx_result(ffi::mdbx_env_create(&mut env))?;
656
657 if let Err(e) = (|| {
658 if let Some(geometry) = &self.geometry {
659 let mut min_size = -1;
660 let mut max_size = -1;
661
662 if let Some(size) = geometry.size {
663 if let Some(size) = size.0 {
664 min_size = size as isize;
665 }
666
667 if let Some(size) = size.1 {
668 max_size = size as isize;
669 }
670 }
671
672 mdbx_result(ffi::mdbx_env_set_geometry(
673 env,
674 min_size,
675 -1,
676 max_size,
677 geometry.growth_step.unwrap_or(-1),
678 geometry.shrink_threshold.unwrap_or(-1),
679 match geometry.page_size {
680 None => -1,
681 Some(PageSize::MinimalAcceptable) => 0,
682 Some(PageSize::Set(size)) => size as isize,
683 },
684 ))?;
685 }
686 for (opt, v) in [
687 (ffi::MDBX_opt_max_db, self.max_dbs),
688 (ffi::MDBX_opt_rp_augment_limit, self.rp_augment_limit),
689 (ffi::MDBX_opt_loose_limit, self.loose_limit),
690 (ffi::MDBX_opt_dp_reserve_limit, self.dp_reserve_limit),
691 (ffi::MDBX_opt_txn_dp_limit, self.txn_dp_limit),
692 (ffi::MDBX_opt_spill_max_denominator, self.spill_max_denominator),
693 (ffi::MDBX_opt_spill_min_denominator, self.spill_min_denominator),
694 ] {
695 if let Some(v) = v {
696 mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
697 }
698 }
699
700 if let Some(max_readers) = self.max_readers {
702 mdbx_result(ffi::mdbx_env_set_option(
703 env,
704 ffi::MDBX_opt_max_readers,
705 max_readers,
706 ))?;
707 }
708
709 if let Some(handle_slow_readers) = self.handle_slow_readers {
710 mdbx_result(ffi::mdbx_env_set_hsr(
711 env,
712 convert_hsr_fn(Some(handle_slow_readers)),
713 ))?;
714 }
715
716 #[cfg(unix)]
717 fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
718 use std::os::unix::ffi::OsStrExt;
719 path.as_ref().as_os_str().as_bytes().to_vec()
720 }
721
722 #[cfg(windows)]
723 fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
724 path.as_ref().to_string_lossy().to_string().into_bytes()
728 }
729
730 let path = match CString::new(path_to_bytes(path)) {
731 Ok(path) => path,
732 Err(_) => return Err(Error::Invalid),
733 };
734 mdbx_result(ffi::mdbx_env_open(
735 env,
736 path.as_ptr(),
737 self.flags.make_flags() | self.kind.extra_flags(),
738 mode,
739 ))?;
740
741 for (opt, v) in [
742 (ffi::MDBX_opt_sync_bytes, self.sync_bytes),
743 (ffi::MDBX_opt_sync_period, self.sync_period),
744 ] {
745 if let Some(v) = v {
746 mdbx_result(ffi::mdbx_env_set_option(env, opt, v))?;
747 }
748 }
749
750 Ok(())
751 })() {
752 ffi::mdbx_env_close_ex(env, false);
753
754 return Err(e)
755 }
756 }
757
758 let env_ptr = EnvPtr(env);
759
760 #[cfg(not(feature = "read-tx-timeouts"))]
761 let txn_manager = TxnManager::new(env_ptr);
762
763 #[cfg(feature = "read-tx-timeouts")]
764 let txn_manager = {
765 if let crate::MaxReadTransactionDuration::Set(duration) = self
766 .max_read_transaction_duration
767 .unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
768 DEFAULT_MAX_READ_TRANSACTION_DURATION,
769 ))
770 {
771 TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
772 } else {
773 TxnManager::new(env_ptr)
774 }
775 };
776
777 let env = EnvironmentInner {
778 env,
779 txn_manager,
780 env_kind: self.kind,
781 ro_txn_pool: ReadTxnPool::new(),
782 };
783
784 Ok(Environment { inner: Arc::new(env) })
785 }
786
787 pub const fn set_kind(&mut self, kind: EnvironmentKind) -> &mut Self {
789 self.kind = kind;
790 self
791 }
792
793 pub const fn write_map(&mut self) -> &mut Self {
797 self.set_kind(EnvironmentKind::WriteMap)
798 }
799
800 pub const fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut Self {
802 self.flags = flags;
803 self
804 }
805
806 pub const fn set_max_readers(&mut self, max_readers: u64) -> &mut Self {
812 self.max_readers = Some(max_readers);
813 self
814 }
815
816 pub const fn set_max_dbs(&mut self, v: usize) -> &mut Self {
826 self.max_dbs = Some(v as u64);
827 self
828 }
829
830 pub const fn set_sync_bytes(&mut self, v: usize) -> &mut Self {
833 self.sync_bytes = Some(v as u64);
834 self
835 }
836
837 pub fn set_sync_period(&mut self, v: Duration) -> &mut Self {
840 let as_mdbx_units = (v.as_secs_f64() * 65536f64) as u64;
842 self.sync_period = Some(as_mdbx_units);
843 self
844 }
845
846 pub const fn set_rp_augment_limit(&mut self, v: u64) -> &mut Self {
847 self.rp_augment_limit = Some(v);
848 self
849 }
850
851 pub const fn set_loose_limit(&mut self, v: u64) -> &mut Self {
852 self.loose_limit = Some(v);
853 self
854 }
855
856 pub const fn set_dp_reserve_limit(&mut self, v: u64) -> &mut Self {
857 self.dp_reserve_limit = Some(v);
858 self
859 }
860
861 pub const fn set_txn_dp_limit(&mut self, v: u64) -> &mut Self {
862 self.txn_dp_limit = Some(v);
863 self
864 }
865
866 pub fn set_spill_max_denominator(&mut self, v: u8) -> &mut Self {
867 self.spill_max_denominator = Some(v.into());
868 self
869 }
870
871 pub fn set_spill_min_denominator(&mut self, v: u8) -> &mut Self {
872 self.spill_min_denominator = Some(v.into());
873 self
874 }
875
876 pub fn set_geometry<R: RangeBounds<usize>>(&mut self, geometry: Geometry<R>) -> &mut Self {
879 let convert_bound = |bound: Bound<&usize>| match bound {
880 Bound::Included(v) | Bound::Excluded(v) => Some(*v),
881 _ => None,
882 };
883 self.geometry = Some(Geometry {
884 size: geometry.size.map(|range| {
885 (convert_bound(range.start_bound()), convert_bound(range.end_bound()))
886 }),
887 growth_step: geometry.growth_step,
888 shrink_threshold: geometry.shrink_threshold,
889 page_size: geometry.page_size,
890 });
891 self
892 }
893
894 pub const fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
895 self.log_level = Some(log_level);
896 self
897 }
898
899 pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
902 self.handle_slow_readers = Some(hsr);
903 self
904 }
905}
906
907#[cfg(feature = "read-tx-timeouts")]
908pub(crate) mod read_transactions {
909 use crate::EnvironmentBuilder;
910 use std::time::Duration;
911
912 #[derive(Debug, Clone, Copy)]
914 #[cfg(feature = "read-tx-timeouts")]
915 pub enum MaxReadTransactionDuration {
916 Unbounded,
918 Set(Duration),
920 }
921
922 #[cfg(feature = "read-tx-timeouts")]
923 impl MaxReadTransactionDuration {
924 pub const fn as_duration(&self) -> Option<Duration> {
925 match self {
926 Self::Unbounded => None,
927 Self::Set(duration) => Some(*duration),
928 }
929 }
930 }
931
932 impl EnvironmentBuilder {
933 pub const fn set_max_read_transaction_duration(
935 &mut self,
936 max_read_transaction_duration: MaxReadTransactionDuration,
937 ) -> &mut Self {
938 self.max_read_transaction_duration = Some(max_read_transaction_duration);
939 self
940 }
941 }
942}
943
944fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
946 unsafe { std::mem::transmute(callback) }
947}
948
949#[cfg(test)]
950mod tests {
951 use crate::{Environment, Error, Geometry, HandleSlowReadersReturnCode, PageSize, WriteFlags};
952 use std::{
953 ops::RangeInclusive,
954 sync::atomic::{AtomicBool, Ordering},
955 };
956
957 #[test]
958 fn test_handle_slow_readers_callback() {
959 static CALLED: AtomicBool = AtomicBool::new(false);
960
961 extern "C" fn handle_slow_readers(
962 _env: *const ffi::MDBX_env,
963 _txn: *const ffi::MDBX_txn,
964 _pid: ffi::mdbx_pid_t,
965 _tid: ffi::mdbx_tid_t,
966 _laggard: u64,
967 _gap: std::ffi::c_uint,
968 _space: usize,
969 _retry: std::ffi::c_int,
970 ) -> HandleSlowReadersReturnCode {
971 CALLED.store(true, Ordering::Relaxed);
972 HandleSlowReadersReturnCode::ProceedWithoutKillingReader
973 }
974
975 let tempdir = tempfile::tempdir().unwrap();
976 let env = Environment::builder()
977 .set_geometry(Geometry::<RangeInclusive<usize>> {
978 size: Some(0..=1024 * 1024), page_size: Some(PageSize::MinimalAcceptable), ..Default::default()
981 })
982 .set_handle_slow_readers(handle_slow_readers)
983 .open(tempdir.path())
984 .unwrap();
985
986 {
988 let tx = env.begin_rw_txn().unwrap();
989 let db = tx.open_db(None).unwrap();
990 for i in 0usize..1_000 {
991 tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
992 }
993 tx.commit().unwrap();
994 }
995
996 let _tx_ro = env.begin_ro_txn().unwrap();
998
999 {
1001 let tx = env.begin_rw_txn().unwrap();
1002 let db = tx.open_db(None).unwrap();
1003 for i in 0usize..1_000 {
1004 tx.put(db.dbi(), i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
1005 }
1006 tx.commit().unwrap();
1007 }
1008
1009 {
1012 let tx = env.begin_rw_txn().unwrap();
1013 let db = tx.open_db(None).unwrap();
1014 for i in 1_000usize..1_000_000 {
1015 match tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()) {
1016 Ok(_) => {}
1017 Err(Error::MapFull) => break,
1018 result @ Err(_) => result.unwrap(),
1019 }
1020 }
1021 let _ = tx.commit();
1025 }
1026
1027 assert!(CALLED.load(Ordering::Relaxed));
1029 }
1030}