1use crate::{
2 database::Database,
3 environment::Environment,
4 error::{mdbx_result, Result},
5 flags::{DatabaseFlags, WriteFlags},
6 txn_manager::{TxnManagerMessage, TxnPtr},
7 Cursor, Error, Stat, TableObject,
8};
9use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
10use indexmap::IndexSet;
11use parking_lot::{Mutex, MutexGuard};
12use std::{
13 ffi::{c_uint, c_void},
14 fmt::{self, Debug},
15 mem::size_of,
16 ptr, slice,
17 sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
18 time::Duration,
19};
20
21#[cfg(feature = "read-tx-timeouts")]
22use ffi::mdbx_txn_renew;
23
24mod private {
25 use super::*;
26
27 pub trait Sealed {}
28
29 impl Sealed for RO {}
30 impl Sealed for RW {}
31}
32
33pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static {
34 #[doc(hidden)]
35 const OPEN_FLAGS: MDBX_txn_flags_t;
36
37 #[doc(hidden)]
39 const IS_READ_ONLY: bool;
40}
41
42#[derive(Debug)]
43#[non_exhaustive]
44pub struct RO;
45
46#[derive(Debug)]
47#[non_exhaustive]
48pub struct RW;
49
50impl TransactionKind for RO {
51 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY;
52 const IS_READ_ONLY: bool = true;
53}
54impl TransactionKind for RW {
55 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE;
56 const IS_READ_ONLY: bool = false;
57}
58
59pub struct Transaction<K>
63where
64 K: TransactionKind,
65{
66 inner: Arc<TransactionInner<K>>,
67}
68
69impl<K> Transaction<K>
70where
71 K: TransactionKind,
72{
73 pub(crate) fn new(env: Environment) -> Result<Self> {
74 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
75 unsafe {
76 mdbx_result(ffi::mdbx_txn_begin_ex(
77 env.env_ptr(),
78 ptr::null_mut(),
79 K::OPEN_FLAGS,
80 &mut txn,
81 ptr::null_mut(),
82 ))?;
83 Ok(Self::new_from_ptr(env, txn))
84 }
85 }
86
87 pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
88 let txn = TransactionPtr::new(txn_ptr);
89
90 #[cfg(feature = "read-tx-timeouts")]
91 if K::IS_READ_ONLY {
92 env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
93 }
94
95 let inner = TransactionInner {
96 txn,
97 primed_dbis: Mutex::new(IndexSet::new()),
98 committed: AtomicBool::new(false),
99 env,
100 _marker: Default::default(),
101 };
102
103 Self { inner: Arc::new(inner) }
104 }
105
106 #[inline]
111 pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
112 where
113 F: FnOnce(*mut ffi::MDBX_txn) -> T,
114 {
115 self.inner.txn_execute(f)
116 }
117
118 #[inline]
123 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
124 where
125 F: FnOnce(*mut ffi::MDBX_txn) -> T,
126 {
127 self.inner.txn_execute_renew_on_timeout(f)
128 }
129
130 #[doc(hidden)]
132 #[cfg(test)]
133 pub fn txn(&self) -> *mut ffi::MDBX_txn {
134 self.inner.txn.txn
135 }
136
137 pub fn env(&self) -> &Environment {
139 &self.inner.env
140 }
141
142 pub fn id(&self) -> Result<u64> {
144 self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
145 }
146
147 pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
156 where
157 Key: TableObject,
158 {
159 let key_val: ffi::MDBX_val =
160 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
161 let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
162
163 self.txn_execute(|txn| unsafe {
164 match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
165 ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, data_val).map(Some),
166 ffi::MDBX_NOTFOUND => Ok(None),
167 err_code => Err(Error::from_err_code(err_code)),
168 }
169 })?
170 }
171
172 pub fn commit(self) -> Result<(bool, CommitLatency)> {
176 self.commit_and_rebind_open_dbs().map(|v| (v.0, v.1))
177 }
178
179 pub fn prime_for_permaopen(&self, db: Database) {
180 self.inner.primed_dbis.lock().insert(db.dbi());
181 }
182
183 pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, CommitLatency, Vec<Database>)> {
185 let result = {
186 let result = self.txn_execute(|txn| {
187 if K::IS_READ_ONLY {
188 #[cfg(feature = "read-tx-timeouts")]
189 self.env().txn_manager().remove_active_read_transaction(txn);
190
191 let mut latency = CommitLatency::new();
192 mdbx_result(unsafe {
193 ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
194 })
195 .map(|v| (v, latency))
196 } else {
197 let (sender, rx) = sync_channel(0);
198 self.env()
199 .txn_manager()
200 .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
201 rx.recv().unwrap()
202 }
203 })?;
204
205 self.inner.set_committed();
206 result
207 };
208 result.map(|(v, latency)| {
209 (
210 v,
211 latency,
212 self.inner
213 .primed_dbis
214 .lock()
215 .iter()
216 .map(|&dbi| Database::new_from_ptr(dbi, self.env().clone()))
217 .collect(),
218 )
219 })
220 }
221
222 pub fn open_db(&self, name: Option<&str>) -> Result<Database> {
234 Database::new(self, name, 0)
235 }
236
237 pub fn db_flags(&self, db: &Database) -> Result<DatabaseFlags> {
239 let mut flags: c_uint = 0;
240 unsafe {
241 self.txn_execute(|txn| {
242 mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
243 })??;
244 }
245
246 #[cfg_attr(not(windows), allow(clippy::useless_conversion))]
248 Ok(DatabaseFlags::from_bits_truncate(flags.try_into().unwrap()))
249 }
250
251 pub fn db_stat(&self, db: &Database) -> Result<Stat> {
253 self.db_stat_with_dbi(db.dbi())
254 }
255
256 pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
258 unsafe {
259 let mut stat = Stat::new();
260 self.txn_execute(|txn| {
261 mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
262 })??;
263 Ok(stat)
264 }
265 }
266
267 pub fn cursor(&self, db: &Database) -> Result<Cursor<K>> {
269 Cursor::new(self.clone(), db.dbi())
270 }
271
272 pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
274 Cursor::new(self.clone(), dbi)
275 }
276
277 #[cfg(feature = "read-tx-timeouts")]
279 pub fn disable_timeout(&self) {
280 if K::IS_READ_ONLY {
281 self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
282 }
283 }
284}
285
286impl<K> Clone for Transaction<K>
287where
288 K: TransactionKind,
289{
290 fn clone(&self) -> Self {
291 Self { inner: Arc::clone(&self.inner) }
292 }
293}
294
295impl<K> fmt::Debug for Transaction<K>
296where
297 K: TransactionKind,
298{
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300 f.debug_struct("RoTransaction").finish_non_exhaustive()
301 }
302}
303
304struct TransactionInner<K>
306where
307 K: TransactionKind,
308{
309 txn: TransactionPtr,
311 primed_dbis: Mutex<IndexSet<ffi::MDBX_dbi>>,
313 committed: AtomicBool,
315 env: Environment,
316 _marker: std::marker::PhantomData<fn(K)>,
317}
318
319impl<K> TransactionInner<K>
320where
321 K: TransactionKind,
322{
323 fn set_committed(&self) {
325 self.committed.store(true, std::sync::atomic::Ordering::SeqCst);
326 }
327
328 fn has_committed(&self) -> bool {
329 self.committed.load(std::sync::atomic::Ordering::SeqCst)
330 }
331
332 #[inline]
333 fn txn_execute<F, T>(&self, f: F) -> Result<T>
334 where
335 F: FnOnce(*mut ffi::MDBX_txn) -> T,
336 {
337 self.txn.txn_execute_fail_on_timeout(f)
338 }
339
340 #[inline]
341 fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
342 where
343 F: FnOnce(*mut ffi::MDBX_txn) -> T,
344 {
345 self.txn.txn_execute_renew_on_timeout(f)
346 }
347}
348
349impl<K> Drop for TransactionInner<K>
350where
351 K: TransactionKind,
352{
353 fn drop(&mut self) {
354 self.txn
357 .txn_execute_renew_on_timeout(|txn| {
358 if !self.has_committed() {
359 if K::IS_READ_ONLY {
360 #[cfg(feature = "read-tx-timeouts")]
361 self.env.txn_manager().remove_active_read_transaction(txn);
362
363 unsafe {
364 ffi::mdbx_txn_abort(txn);
365 }
366 } else {
367 let (sender, rx) = sync_channel(0);
368 self.env
369 .txn_manager()
370 .send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
371 rx.recv().unwrap().unwrap();
372 }
373 }
374 })
375 .unwrap();
376 }
377}
378
379impl Transaction<RW> {
380 fn open_db_with_flags(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
381 Database::new(self, name, flags.bits())
382 }
383
384 pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
397 self.open_db_with_flags(name, flags | DatabaseFlags::CREATE)
398 }
399
400 pub fn put(
407 &self,
408 dbi: ffi::MDBX_dbi,
409 key: impl AsRef<[u8]>,
410 data: impl AsRef<[u8]>,
411 flags: WriteFlags,
412 ) -> Result<()> {
413 let key = key.as_ref();
414 let data = data.as_ref();
415 let key_val: ffi::MDBX_val =
416 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
417 let mut data_val: ffi::MDBX_val =
418 ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
419 mdbx_result(self.txn_execute(|txn| unsafe {
420 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
421 })?)?;
422
423 Ok(())
424 }
425
426 pub fn reserve(
430 &self,
431 db: &Database,
432 key: impl AsRef<[u8]>,
433 len: usize,
434 flags: WriteFlags,
435 ) -> Result<&mut [u8]> {
436 let key = key.as_ref();
437 let key_val: ffi::MDBX_val =
438 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
439 let mut data_val: ffi::MDBX_val =
440 ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
441 unsafe {
442 mdbx_result(self.txn_execute(|txn| {
443 ffi::mdbx_put(
444 txn,
445 db.dbi(),
446 &key_val,
447 &mut data_val,
448 flags.bits() | ffi::MDBX_RESERVE,
449 )
450 })?)?;
451 Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
452 }
453 }
454
455 pub fn del(
465 &self,
466 dbi: ffi::MDBX_dbi,
467 key: impl AsRef<[u8]>,
468 data: Option<&[u8]>,
469 ) -> Result<bool> {
470 let key = key.as_ref();
471 let key_val: ffi::MDBX_val =
472 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
473 let data_val: Option<ffi::MDBX_val> = data.map(|data| ffi::MDBX_val {
474 iov_len: data.len(),
475 iov_base: data.as_ptr() as *mut c_void,
476 });
477
478 mdbx_result({
479 self.txn_execute(|txn| {
480 if let Some(d) = data_val {
481 unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
482 } else {
483 unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
484 }
485 })?
486 })
487 .map(|_| true)
488 .or_else(|e| match e {
489 Error::NotFound => Ok(false),
490 other => Err(other),
491 })
492 }
493
494 pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
496 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
497
498 Ok(())
499 }
500
501 pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
507 mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?;
508
509 Ok(())
510 }
511}
512
513impl Transaction<RO> {
514 pub unsafe fn close_db(&self, db: Database) -> Result<()> {
520 mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()))?;
521
522 Ok(())
523 }
524}
525
526impl Transaction<RW> {
527 pub fn begin_nested_txn(&mut self) -> Result<Self> {
529 if self.inner.env.is_write_map() {
530 return Err(Error::NestedTransactionsUnsupportedWithWriteMap)
531 }
532 self.txn_execute(|txn| {
533 let (tx, rx) = sync_channel(0);
534 self.env().txn_manager().send_message(TxnManagerMessage::Begin {
535 parent: TxnPtr(txn),
536 flags: RW::OPEN_FLAGS,
537 sender: tx,
538 });
539
540 rx.recv().unwrap().map(|ptr| Self::new_from_ptr(self.env().clone(), ptr.0))
541 })?
542 }
543}
544
545#[derive(Debug, Clone)]
547pub(crate) struct TransactionPtr {
548 txn: *mut ffi::MDBX_txn,
549 #[cfg(feature = "read-tx-timeouts")]
550 timed_out: Arc<AtomicBool>,
551 lock: Arc<Mutex<()>>,
552}
553
554impl TransactionPtr {
555 fn new(txn: *mut ffi::MDBX_txn) -> Self {
556 Self {
557 txn,
558 #[cfg(feature = "read-tx-timeouts")]
559 timed_out: Arc::new(AtomicBool::new(false)),
560 lock: Arc::new(Mutex::new(())),
561 }
562 }
563
564 #[cfg(feature = "read-tx-timeouts")]
573 fn is_timed_out(&self) -> bool {
574 self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
575 }
576
577 #[cfg(feature = "read-tx-timeouts")]
578 pub(crate) fn set_timed_out(&self) {
579 self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst);
580 }
581
582 fn lock(&self) -> MutexGuard<'_, ()> {
585 if let Some(lock) = self.lock.try_lock() {
586 lock
587 } else {
588 tracing::trace!(
589 target: "libmdbx",
590 txn = %self.txn as usize,
591 backtrace = %std::backtrace::Backtrace::capture(),
592 "Transaction lock is already acquired, blocking...
593 To display the full backtrace, run with `RUST_BACKTRACE=full` env variable."
594 );
595 self.lock.lock()
596 }
597 }
598
599 #[inline]
603 pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
604 where
605 F: FnOnce(*mut ffi::MDBX_txn) -> T,
606 {
607 let _lck = self.lock();
608
609 #[cfg(feature = "read-tx-timeouts")]
613 if self.is_timed_out() {
614 return Err(Error::ReadTransactionTimeout)
615 }
616
617 Ok((f)(self.txn))
618 }
619
620 #[inline]
625 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
626 where
627 F: FnOnce(*mut ffi::MDBX_txn) -> T,
628 {
629 let _lck = self.lock();
630
631 #[cfg(feature = "read-tx-timeouts")]
633 if self.is_timed_out() {
634 mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
635 }
636
637 Ok((f)(self.txn))
638 }
639}
640
641#[derive(Debug)]
646#[repr(transparent)]
647pub struct CommitLatency(ffi::MDBX_commit_latency);
648
649impl CommitLatency {
650 pub(crate) const fn new() -> Self {
652 unsafe { Self(std::mem::zeroed()) }
653 }
654
655 pub(crate) fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
657 &mut self.0
658 }
659}
660
661impl CommitLatency {
662 #[inline]
665 pub const fn preparation(&self) -> Duration {
666 Self::time_to_duration(self.0.preparation)
667 }
668
669 #[inline]
671 pub const fn gc_wallclock(&self) -> Duration {
672 Self::time_to_duration(self.0.gc_wallclock)
673 }
674
675 #[inline]
677 pub const fn audit(&self) -> Duration {
678 Self::time_to_duration(self.0.audit)
679 }
680
681 #[inline]
684 pub const fn write(&self) -> Duration {
685 Self::time_to_duration(self.0.write)
686 }
687
688 #[inline]
691 pub const fn sync(&self) -> Duration {
692 Self::time_to_duration(self.0.sync)
693 }
694
695 #[inline]
697 pub const fn ending(&self) -> Duration {
698 Self::time_to_duration(self.0.ending)
699 }
700
701 #[inline]
703 pub const fn whole(&self) -> Duration {
704 Self::time_to_duration(self.0.whole)
705 }
706
707 #[inline]
709 pub const fn gc_cputime(&self) -> Duration {
710 Self::time_to_duration(self.0.gc_cputime)
711 }
712
713 #[inline]
714 const fn time_to_duration(time: u32) -> Duration {
715 Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
716 }
717}
718
719unsafe impl Send for TransactionPtr {}
721
722unsafe impl Sync for TransactionPtr {}
724
725#[cfg(test)]
726mod tests {
727 use super::*;
728
729 const fn assert_send_sync<T: Send + Sync>() {}
730
731 #[allow(dead_code)]
732 const fn test_txn_send_sync() {
733 assert_send_sync::<Transaction<RO>>();
734 assert_send_sync::<Transaction<RW>>();
735 }
736}