1use crate::{
2 database::Database,
3 environment::Environment,
4 error::{mdbx_result, Result},
5 flags::{DatabaseFlags, WriteFlags},
6 txn_manager::{TxnManagerMessage, TxnPtr},
7 Cursor, Error, Stat, TableObject,
8};
9use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
10use parking_lot::{Mutex, MutexGuard};
11use std::{
12 ffi::{c_uint, c_void},
13 fmt::{self, Debug},
14 mem::size_of,
15 ptr, slice,
16 sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
17 time::Duration,
18};
19
20#[cfg(feature = "read-tx-timeouts")]
21use ffi::mdbx_txn_renew;
22
23mod private {
24 use super::*;
25
26 pub trait Sealed {}
27
28 impl Sealed for RO {}
29 impl Sealed for RW {}
30}
31
32pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static {
33 #[doc(hidden)]
34 const OPEN_FLAGS: MDBX_txn_flags_t;
35
36 #[doc(hidden)]
38 const IS_READ_ONLY: bool;
39}
40
41#[derive(Debug)]
42#[non_exhaustive]
43pub struct RO;
44
45#[derive(Debug)]
46#[non_exhaustive]
47pub struct RW;
48
49impl TransactionKind for RO {
50 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY;
51 const IS_READ_ONLY: bool = true;
52}
53impl TransactionKind for RW {
54 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE;
55 const IS_READ_ONLY: bool = false;
56}
57
58pub struct Transaction<K>
62where
63 K: TransactionKind,
64{
65 inner: Arc<TransactionInner<K>>,
66}
67
68impl<K> Transaction<K>
69where
70 K: TransactionKind,
71{
72 pub(crate) fn new(env: Environment) -> Result<Self> {
73 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
74 unsafe {
75 mdbx_result(ffi::mdbx_txn_begin_ex(
76 env.env_ptr(),
77 ptr::null_mut(),
78 K::OPEN_FLAGS,
79 &mut txn,
80 ptr::null_mut(),
81 ))?;
82 Ok(Self::new_from_ptr(env, txn))
83 }
84 }
85
86 pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
87 let txn = TransactionPtr::new(txn_ptr);
88
89 #[cfg(feature = "read-tx-timeouts")]
90 if K::IS_READ_ONLY {
91 env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
92 }
93
94 let inner = TransactionInner {
95 txn,
96 committed: AtomicBool::new(false),
97 env,
98 _marker: Default::default(),
99 };
100
101 Self { inner: Arc::new(inner) }
102 }
103
104 #[inline]
109 pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
110 where
111 F: FnOnce(*mut ffi::MDBX_txn) -> T,
112 {
113 self.inner.txn_execute(f)
114 }
115
116 #[inline]
121 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
122 where
123 F: FnOnce(*mut ffi::MDBX_txn) -> T,
124 {
125 self.inner.txn_execute_renew_on_timeout(f)
126 }
127
128 #[doc(hidden)]
130 #[cfg(test)]
131 pub fn txn(&self) -> *mut ffi::MDBX_txn {
132 self.inner.txn.txn
133 }
134
135 pub fn env(&self) -> &Environment {
137 &self.inner.env
138 }
139
140 pub fn id(&self) -> Result<u64> {
142 self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
143 }
144
145 pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
154 where
155 Key: TableObject,
156 {
157 let key_val: ffi::MDBX_val =
158 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
159 let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
160
161 self.txn_execute(|txn| unsafe {
162 match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
163 ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, data_val).map(Some),
164 ffi::MDBX_NOTFOUND => Ok(None),
165 err_code => Err(Error::from_err_code(err_code)),
166 }
167 })?
168 }
169
170 pub fn commit(self) -> Result<(bool, CommitLatency)> {
174 let result = self.txn_execute(|txn| {
175 if K::IS_READ_ONLY {
176 #[cfg(feature = "read-tx-timeouts")]
177 self.env().txn_manager().remove_active_read_transaction(txn);
178
179 let mut latency = CommitLatency::new();
180 mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) })
181 .map(|v| (v, latency))
182 } else {
183 let (sender, rx) = sync_channel(0);
184 self.env()
185 .txn_manager()
186 .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
187 rx.recv().unwrap()
188 }
189 })?;
190
191 self.inner.set_committed();
192 result
193 }
194
195 pub fn open_db(&self, name: Option<&str>) -> Result<Database> {
207 Database::new(self, name, 0)
208 }
209
210 pub fn db_flags(&self, db: &Database) -> Result<DatabaseFlags> {
212 let mut flags: c_uint = 0;
213 unsafe {
214 self.txn_execute(|txn| {
215 mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
216 })??;
217 }
218
219 #[cfg_attr(not(windows), allow(clippy::useless_conversion))]
221 Ok(DatabaseFlags::from_bits_truncate(flags.try_into().unwrap()))
222 }
223
224 pub fn db_stat(&self, db: &Database) -> Result<Stat> {
226 self.db_stat_with_dbi(db.dbi())
227 }
228
229 pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
231 unsafe {
232 let mut stat = Stat::new();
233 self.txn_execute(|txn| {
234 mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
235 })??;
236 Ok(stat)
237 }
238 }
239
240 pub fn cursor(&self, db: &Database) -> Result<Cursor<K>> {
242 Cursor::new(self.clone(), db.dbi())
243 }
244
245 pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
247 Cursor::new(self.clone(), dbi)
248 }
249
250 #[cfg(feature = "read-tx-timeouts")]
252 pub fn disable_timeout(&self) {
253 if K::IS_READ_ONLY {
254 self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
255 }
256 }
257}
258
259impl<K> Clone for Transaction<K>
260where
261 K: TransactionKind,
262{
263 fn clone(&self) -> Self {
264 Self { inner: Arc::clone(&self.inner) }
265 }
266}
267
268impl<K> fmt::Debug for Transaction<K>
269where
270 K: TransactionKind,
271{
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 f.debug_struct("RoTransaction").finish_non_exhaustive()
274 }
275}
276
277struct TransactionInner<K>
279where
280 K: TransactionKind,
281{
282 txn: TransactionPtr,
284 committed: AtomicBool,
286 env: Environment,
287 _marker: std::marker::PhantomData<fn(K)>,
288}
289
290impl<K> TransactionInner<K>
291where
292 K: TransactionKind,
293{
294 fn set_committed(&self) {
296 self.committed.store(true, std::sync::atomic::Ordering::SeqCst);
297 }
298
299 fn has_committed(&self) -> bool {
300 self.committed.load(std::sync::atomic::Ordering::SeqCst)
301 }
302
303 #[inline]
304 fn txn_execute<F, T>(&self, f: F) -> Result<T>
305 where
306 F: FnOnce(*mut ffi::MDBX_txn) -> T,
307 {
308 self.txn.txn_execute_fail_on_timeout(f)
309 }
310
311 #[inline]
312 fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
313 where
314 F: FnOnce(*mut ffi::MDBX_txn) -> T,
315 {
316 self.txn.txn_execute_renew_on_timeout(f)
317 }
318}
319
320impl<K> Drop for TransactionInner<K>
321where
322 K: TransactionKind,
323{
324 fn drop(&mut self) {
325 self.txn
328 .txn_execute_renew_on_timeout(|txn| {
329 if !self.has_committed() {
330 if K::IS_READ_ONLY {
331 #[cfg(feature = "read-tx-timeouts")]
332 self.env.txn_manager().remove_active_read_transaction(txn);
333
334 unsafe {
335 ffi::mdbx_txn_abort(txn);
336 }
337 } else {
338 let (sender, rx) = sync_channel(0);
339 self.env
340 .txn_manager()
341 .send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
342 rx.recv().unwrap().unwrap();
343 }
344 }
345 })
346 .unwrap();
347 }
348}
349
350impl Transaction<RW> {
351 fn open_db_with_flags(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
352 Database::new(self, name, flags.bits())
353 }
354
355 pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
368 self.open_db_with_flags(name, flags | DatabaseFlags::CREATE)
369 }
370
371 pub fn put(
378 &self,
379 dbi: ffi::MDBX_dbi,
380 key: impl AsRef<[u8]>,
381 data: impl AsRef<[u8]>,
382 flags: WriteFlags,
383 ) -> Result<()> {
384 let key = key.as_ref();
385 let data = data.as_ref();
386 let key_val: ffi::MDBX_val =
387 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
388 let mut data_val: ffi::MDBX_val =
389 ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
390 mdbx_result(self.txn_execute(|txn| unsafe {
391 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
392 })?)?;
393
394 Ok(())
395 }
396
397 #[allow(clippy::mut_from_ref)]
401 pub fn reserve(
402 &self,
403 db: &Database,
404 key: impl AsRef<[u8]>,
405 len: usize,
406 flags: WriteFlags,
407 ) -> Result<&mut [u8]> {
408 let key = key.as_ref();
409 let key_val: ffi::MDBX_val =
410 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
411 let mut data_val: ffi::MDBX_val =
412 ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
413 unsafe {
414 mdbx_result(self.txn_execute(|txn| {
415 ffi::mdbx_put(
416 txn,
417 db.dbi(),
418 &key_val,
419 &mut data_val,
420 flags.bits() | ffi::MDBX_RESERVE,
421 )
422 })?)?;
423 Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
424 }
425 }
426
427 pub fn del(
437 &self,
438 dbi: ffi::MDBX_dbi,
439 key: impl AsRef<[u8]>,
440 data: Option<&[u8]>,
441 ) -> Result<bool> {
442 let key = key.as_ref();
443 let key_val: ffi::MDBX_val =
444 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
445 let data_val: Option<ffi::MDBX_val> = data.map(|data| ffi::MDBX_val {
446 iov_len: data.len(),
447 iov_base: data.as_ptr() as *mut c_void,
448 });
449
450 mdbx_result({
451 self.txn_execute(|txn| {
452 if let Some(d) = data_val {
453 unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
454 } else {
455 unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
456 }
457 })?
458 })
459 .map(|_| true)
460 .or_else(|e| match e {
461 Error::NotFound => Ok(false),
462 other => Err(other),
463 })
464 }
465
466 pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
468 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
469
470 Ok(())
471 }
472
473 pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
479 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, db.dbi(), true) })?)?;
480
481 Ok(())
482 }
483}
484
485impl Transaction<RO> {
486 pub unsafe fn close_db(&self, db: Database) -> Result<()> {
492 mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()) })?;
493
494 Ok(())
495 }
496}
497
498impl Transaction<RW> {
499 pub fn begin_nested_txn(&mut self) -> Result<Self> {
501 if self.inner.env.is_write_map() {
502 return Err(Error::NestedTransactionsUnsupportedWithWriteMap)
503 }
504 self.txn_execute(|txn| {
505 let (tx, rx) = sync_channel(0);
506 self.env().txn_manager().send_message(TxnManagerMessage::Begin {
507 parent: TxnPtr(txn),
508 flags: RW::OPEN_FLAGS,
509 sender: tx,
510 });
511
512 rx.recv().unwrap().map(|ptr| Self::new_from_ptr(self.env().clone(), ptr.0))
513 })?
514 }
515}
516
517#[derive(Debug, Clone)]
519pub(crate) struct TransactionPtr {
520 txn: *mut ffi::MDBX_txn,
521 #[cfg(feature = "read-tx-timeouts")]
522 timed_out: Arc<AtomicBool>,
523 lock: Arc<Mutex<()>>,
524}
525
526impl TransactionPtr {
527 fn new(txn: *mut ffi::MDBX_txn) -> Self {
528 Self {
529 txn,
530 #[cfg(feature = "read-tx-timeouts")]
531 timed_out: Arc::new(AtomicBool::new(false)),
532 lock: Arc::new(Mutex::new(())),
533 }
534 }
535
536 #[cfg(feature = "read-tx-timeouts")]
545 fn is_timed_out(&self) -> bool {
546 self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
547 }
548
549 #[cfg(feature = "read-tx-timeouts")]
550 pub(crate) fn set_timed_out(&self) {
551 self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst);
552 }
553
554 fn lock(&self) -> MutexGuard<'_, ()> {
557 if let Some(lock) = self.lock.try_lock() {
558 lock
559 } else {
560 tracing::trace!(
561 target: "libmdbx",
562 txn = %self.txn as usize,
563 backtrace = %std::backtrace::Backtrace::capture(),
564 "Transaction lock is already acquired, blocking...
565 To display the full backtrace, run with `RUST_BACKTRACE=full` env variable."
566 );
567 self.lock.lock()
568 }
569 }
570
571 #[inline]
575 pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
576 where
577 F: FnOnce(*mut ffi::MDBX_txn) -> T,
578 {
579 let _lck = self.lock();
580
581 #[cfg(feature = "read-tx-timeouts")]
585 if self.is_timed_out() {
586 return Err(Error::ReadTransactionTimeout)
587 }
588
589 Ok((f)(self.txn))
590 }
591
592 #[inline]
597 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
598 where
599 F: FnOnce(*mut ffi::MDBX_txn) -> T,
600 {
601 let _lck = self.lock();
602
603 #[cfg(feature = "read-tx-timeouts")]
605 if self.is_timed_out() {
606 mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
607 }
608
609 Ok((f)(self.txn))
610 }
611}
612
613#[derive(Debug)]
618#[repr(transparent)]
619pub struct CommitLatency(ffi::MDBX_commit_latency);
620
621impl CommitLatency {
622 pub(crate) const fn new() -> Self {
624 unsafe { Self(std::mem::zeroed()) }
625 }
626
627 pub(crate) const fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
629 &mut self.0
630 }
631}
632
633impl CommitLatency {
634 #[inline]
637 pub const fn preparation(&self) -> Duration {
638 Self::time_to_duration(self.0.preparation)
639 }
640
641 #[inline]
643 pub const fn gc_wallclock(&self) -> Duration {
644 Self::time_to_duration(self.0.gc_wallclock)
645 }
646
647 #[inline]
649 pub const fn audit(&self) -> Duration {
650 Self::time_to_duration(self.0.audit)
651 }
652
653 #[inline]
656 pub const fn write(&self) -> Duration {
657 Self::time_to_duration(self.0.write)
658 }
659
660 #[inline]
663 pub const fn sync(&self) -> Duration {
664 Self::time_to_duration(self.0.sync)
665 }
666
667 #[inline]
669 pub const fn ending(&self) -> Duration {
670 Self::time_to_duration(self.0.ending)
671 }
672
673 #[inline]
675 pub const fn whole(&self) -> Duration {
676 Self::time_to_duration(self.0.whole)
677 }
678
679 #[inline]
681 pub const fn gc_cputime(&self) -> Duration {
682 Self::time_to_duration(self.0.gc_cputime)
683 }
684
685 #[inline]
686 const fn time_to_duration(time: u32) -> Duration {
687 Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
688 }
689}
690
691unsafe impl Send for TransactionPtr {}
693
694unsafe impl Sync for TransactionPtr {}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700
701 const fn assert_send_sync<T: Send + Sync>() {}
702
703 #[expect(dead_code)]
704 const fn test_txn_send_sync() {
705 assert_send_sync::<Transaction<RO>>();
706 assert_send_sync::<Transaction<RW>>();
707 }
708}