1use crate::{
2 database::Database,
3 environment::Environment,
4 error::{mdbx_result, Result},
5 flags::{DatabaseFlags, WriteFlags},
6 txn_manager::{TxnManagerMessage, TxnPtr},
7 Cursor, Error, Stat, TableObject,
8};
9use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
10use parking_lot::{Mutex, MutexGuard};
11use std::{
12 ffi::{c_uint, c_void},
13 fmt::{self, Debug},
14 mem::size_of,
15 ptr, slice,
16 sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
17 time::Duration,
18};
19
20#[cfg(feature = "read-tx-timeouts")]
21use ffi::mdbx_txn_renew;
22
23mod private {
24 use super::*;
25
26 pub trait Sealed {}
27
28 impl Sealed for RO {}
29 impl Sealed for RW {}
30}
31
32pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static {
33 #[doc(hidden)]
34 const OPEN_FLAGS: MDBX_txn_flags_t;
35
36 #[doc(hidden)]
38 const IS_READ_ONLY: bool;
39}
40
41#[derive(Debug)]
42#[non_exhaustive]
43pub struct RO;
44
45#[derive(Debug)]
46#[non_exhaustive]
47pub struct RW;
48
49impl TransactionKind for RO {
50 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY;
51 const IS_READ_ONLY: bool = true;
52}
53impl TransactionKind for RW {
54 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE;
55 const IS_READ_ONLY: bool = false;
56}
57
58pub struct Transaction<K>
62where
63 K: TransactionKind,
64{
65 inner: Arc<TransactionInner<K>>,
66}
67
68impl<K> Transaction<K>
69where
70 K: TransactionKind,
71{
72 pub(crate) fn new(env: Environment) -> Result<Self> {
73 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
74 unsafe {
75 mdbx_result(ffi::mdbx_txn_begin_ex(
76 env.env_ptr(),
77 ptr::null_mut(),
78 K::OPEN_FLAGS,
79 &mut txn,
80 ptr::null_mut(),
81 ))?;
82 Ok(Self::new_from_ptr(env, txn))
83 }
84 }
85
86 pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
87 let txn = TransactionPtr::new(txn_ptr);
88
89 #[cfg(feature = "read-tx-timeouts")]
90 if K::IS_READ_ONLY {
91 env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
92 }
93
94 let inner = TransactionInner {
95 txn,
96 committed: AtomicBool::new(false),
97 env,
98 _marker: Default::default(),
99 };
100
101 Self { inner: Arc::new(inner) }
102 }
103
104 #[inline]
109 pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
110 where
111 F: FnOnce(*mut ffi::MDBX_txn) -> T,
112 {
113 self.inner.txn_execute(f)
114 }
115
116 #[inline]
121 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
122 where
123 F: FnOnce(*mut ffi::MDBX_txn) -> T,
124 {
125 self.inner.txn_execute_renew_on_timeout(f)
126 }
127
128 #[doc(hidden)]
130 #[cfg(test)]
131 pub fn txn(&self) -> *mut ffi::MDBX_txn {
132 self.inner.txn.txn
133 }
134
135 pub fn env(&self) -> &Environment {
137 &self.inner.env
138 }
139
140 pub fn id(&self) -> Result<u64> {
142 self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
143 }
144
145 pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
154 where
155 Key: TableObject,
156 {
157 let key_val: ffi::MDBX_val =
158 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
159 let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
160
161 self.txn_execute(|txn| unsafe {
162 match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
163 ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, data_val).map(Some),
164 ffi::MDBX_NOTFOUND => Ok(None),
165 err_code => Err(Error::from_err_code(err_code)),
166 }
167 })?
168 }
169
170 pub fn commit(self) -> Result<CommitLatency> {
174 match self.txn_execute(|txn| {
175 if K::IS_READ_ONLY {
176 #[cfg(feature = "read-tx-timeouts")]
177 self.env().txn_manager().remove_active_read_transaction(txn);
178
179 let mut latency = CommitLatency::new();
180 mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdbx_commit_latency()) })
181 .map(|v| (v, latency))
182 } else {
183 let (sender, rx) = sync_channel(0);
184 self.env()
185 .txn_manager()
186 .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
187 rx.recv().unwrap()
188 }
189 })? {
190 Ok((false, lat)) => {
192 self.inner.set_committed();
193 Ok(lat)
194 }
195 Ok((true, _)) => {
196 self.inner.set_committed();
200 Err(Error::BotchedTransaction)
201 }
202 Err(e) => Err(e),
203 }
204 }
205
206 pub fn open_db(&self, name: Option<&str>) -> Result<Database> {
218 Database::new(self, name, 0)
219 }
220
221 pub fn db_flags(&self, dbi: ffi::MDBX_dbi) -> Result<DatabaseFlags> {
223 let mut flags: c_uint = 0;
224 unsafe {
225 self.txn_execute(|txn| {
226 let mut _status: c_uint = 0;
230 mdbx_result(ffi::mdbx_dbi_flags_ex(txn, dbi, &mut flags, &mut _status))
231 })??;
232 }
233
234 #[cfg_attr(not(windows), allow(clippy::useless_conversion))]
236 Ok(DatabaseFlags::from_bits_truncate(flags.try_into().unwrap()))
237 }
238
239 pub fn db_stat(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
241 self.db_stat_with_dbi(dbi)
242 }
243
244 pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
246 unsafe {
247 let mut stat = Stat::new();
248 self.txn_execute(|txn| {
249 mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdbx_stat(), size_of::<Stat>()))
250 })??;
251 Ok(stat)
252 }
253 }
254
255 pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
257 Cursor::new(self.clone(), dbi)
258 }
259
260 pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
262 Cursor::new(self.clone(), dbi)
263 }
264
265 #[cfg(feature = "read-tx-timeouts")]
267 pub fn disable_timeout(&self) {
268 if K::IS_READ_ONLY {
269 self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
270 }
271 }
272}
273
274impl<K> Clone for Transaction<K>
275where
276 K: TransactionKind,
277{
278 fn clone(&self) -> Self {
279 Self { inner: Arc::clone(&self.inner) }
280 }
281}
282
283impl<K> fmt::Debug for Transaction<K>
284where
285 K: TransactionKind,
286{
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 f.debug_struct("RoTransaction").finish_non_exhaustive()
289 }
290}
291
292struct TransactionInner<K>
294where
295 K: TransactionKind,
296{
297 txn: TransactionPtr,
299 committed: AtomicBool,
301 env: Environment,
302 _marker: std::marker::PhantomData<fn(K)>,
303}
304
305impl<K> TransactionInner<K>
306where
307 K: TransactionKind,
308{
309 fn set_committed(&self) {
311 self.committed.store(true, std::sync::atomic::Ordering::SeqCst);
312 }
313
314 fn has_committed(&self) -> bool {
315 self.committed.load(std::sync::atomic::Ordering::SeqCst)
316 }
317
318 #[inline]
319 fn txn_execute<F, T>(&self, f: F) -> Result<T>
320 where
321 F: FnOnce(*mut ffi::MDBX_txn) -> T,
322 {
323 self.txn.txn_execute_fail_on_timeout(f)
324 }
325
326 #[inline]
327 fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
328 where
329 F: FnOnce(*mut ffi::MDBX_txn) -> T,
330 {
331 self.txn.txn_execute_renew_on_timeout(f)
332 }
333}
334
335impl<K> Drop for TransactionInner<K>
336where
337 K: TransactionKind,
338{
339 fn drop(&mut self) {
340 let _ = self.txn.txn_execute_renew_on_timeout(|txn| {
348 if !self.has_committed() {
349 if K::IS_READ_ONLY {
350 #[cfg(feature = "read-tx-timeouts")]
351 self.env.txn_manager().remove_active_read_transaction(txn);
352
353 self.env.ro_txn_pool().push(txn);
357 } else {
358 let (sender, rx) = sync_channel(0);
359 self.env
360 .txn_manager()
361 .send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
362 if let Ok(Err(e)) = rx.recv() {
363 tracing::error!(target: "libmdbx", %e, "failed to abort transaction in drop");
364 }
365 }
366 }
367 });
368 }
369}
370
371impl Transaction<RW> {
372 fn open_db_with_flags(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
373 Database::new(self, name, flags.bits())
374 }
375
376 pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
389 self.open_db_with_flags(name, flags | DatabaseFlags::CREATE)
390 }
391
392 pub fn put(
399 &self,
400 dbi: ffi::MDBX_dbi,
401 key: impl AsRef<[u8]>,
402 data: impl AsRef<[u8]>,
403 flags: WriteFlags,
404 ) -> Result<()> {
405 let key = key.as_ref();
406 let data = data.as_ref();
407 let key_val: ffi::MDBX_val =
408 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
409 let mut data_val: ffi::MDBX_val =
410 ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
411 mdbx_result(self.txn_execute(|txn| unsafe {
412 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
413 })?)?;
414
415 Ok(())
416 }
417
418 #[allow(clippy::mut_from_ref)]
430 pub unsafe fn reserve(
431 &self,
432 dbi: ffi::MDBX_dbi,
433 key: impl AsRef<[u8]>,
434 len: usize,
435 flags: WriteFlags,
436 ) -> Result<&mut [u8]> {
437 let key = key.as_ref();
438 let key_val: ffi::MDBX_val =
439 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
440 let mut data_val: ffi::MDBX_val =
441 ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
442 unsafe {
443 mdbx_result(self.txn_execute(|txn| {
444 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits() | ffi::MDBX_RESERVE)
445 })?)?;
446 Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
447 }
448 }
449
450 pub fn del(
460 &self,
461 dbi: ffi::MDBX_dbi,
462 key: impl AsRef<[u8]>,
463 data: Option<&[u8]>,
464 ) -> Result<bool> {
465 let key = key.as_ref();
466 let key_val: ffi::MDBX_val =
467 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
468 let data_val: Option<ffi::MDBX_val> = data.map(|data| ffi::MDBX_val {
469 iov_len: data.len(),
470 iov_base: data.as_ptr() as *mut c_void,
471 });
472
473 mdbx_result({
474 self.txn_execute(|txn| {
475 if let Some(d) = data_val {
476 unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
477 } else {
478 unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
479 }
480 })?
481 })
482 .map(|_| true)
483 .or_else(|e| match e {
484 Error::NotFound => Ok(false),
485 other => Err(other),
486 })
487 }
488
489 pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
491 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
492
493 Ok(())
494 }
495
496 pub unsafe fn drop_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
502 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, true) })?)?;
503
504 Ok(())
505 }
506}
507
508impl Transaction<RO> {
509 pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
515 mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), dbi) })?;
516
517 Ok(())
518 }
519}
520
521impl Transaction<RW> {
522 pub fn begin_nested_txn(&mut self) -> Result<Self> {
524 if self.inner.env.is_write_map() {
525 return Err(Error::NestedTransactionsUnsupportedWithWriteMap)
526 }
527 self.txn_execute(|txn| {
528 let (tx, rx) = sync_channel(0);
529 self.env().txn_manager().send_message(TxnManagerMessage::Begin {
530 parent: TxnPtr(txn),
531 flags: RW::OPEN_FLAGS,
532 sender: tx,
533 });
534
535 rx.recv().unwrap().map(|ptr| Self::new_from_ptr(self.env().clone(), ptr.0))
536 })?
537 }
538}
539
540#[derive(Debug, Clone)]
542pub(crate) struct TransactionPtr {
543 txn: *mut ffi::MDBX_txn,
544 #[cfg(feature = "read-tx-timeouts")]
545 timed_out: Arc<AtomicBool>,
546 lock: Arc<Mutex<()>>,
547}
548
549impl TransactionPtr {
550 fn new(txn: *mut ffi::MDBX_txn) -> Self {
551 Self {
552 txn,
553 #[cfg(feature = "read-tx-timeouts")]
554 timed_out: Arc::new(AtomicBool::new(false)),
555 lock: Arc::new(Mutex::new(())),
556 }
557 }
558
559 #[cfg(feature = "read-tx-timeouts")]
568 fn is_timed_out(&self) -> bool {
569 self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
570 }
571
572 #[cfg(feature = "read-tx-timeouts")]
573 pub(crate) fn set_timed_out(&self) {
574 self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst);
575 }
576
577 fn lock(&self) -> MutexGuard<'_, ()> {
580 if let Some(lock) = self.lock.try_lock() {
581 lock
582 } else {
583 tracing::trace!(
584 target: "libmdbx",
585 txn = %self.txn as usize,
586 backtrace = %std::backtrace::Backtrace::capture(),
587 "Transaction lock is already acquired, blocking...
588 To display the full backtrace, run with `RUST_BACKTRACE=full` env variable."
589 );
590 self.lock.lock()
591 }
592 }
593
594 #[inline]
598 pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
599 where
600 F: FnOnce(*mut ffi::MDBX_txn) -> T,
601 {
602 let _lck = self.lock();
603
604 #[cfg(feature = "read-tx-timeouts")]
608 if self.is_timed_out() {
609 return Err(Error::ReadTransactionTimeout)
610 }
611
612 Ok((f)(self.txn))
613 }
614
615 #[inline]
620 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
621 where
622 F: FnOnce(*mut ffi::MDBX_txn) -> T,
623 {
624 let _lck = self.lock();
625
626 #[cfg(feature = "read-tx-timeouts")]
628 if self.is_timed_out() {
629 mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
630 }
631
632 Ok((f)(self.txn))
633 }
634}
635
636#[derive(Debug)]
641#[repr(transparent)]
642pub struct CommitLatency(ffi::MDBX_commit_latency);
643
644impl CommitLatency {
645 pub(crate) const fn new() -> Self {
647 unsafe { Self(std::mem::zeroed()) }
648 }
649
650 pub(crate) const fn mdbx_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
652 &mut self.0
653 }
654}
655
656impl CommitLatency {
657 #[inline]
660 pub const fn preparation(&self) -> Duration {
661 Self::time_to_duration(self.0.preparation)
662 }
663
664 #[inline]
666 pub const fn gc_wallclock(&self) -> Duration {
667 Self::time_to_duration(self.0.gc_wallclock)
668 }
669
670 #[inline]
672 pub const fn audit(&self) -> Duration {
673 Self::time_to_duration(self.0.audit)
674 }
675
676 #[inline]
679 pub const fn write(&self) -> Duration {
680 Self::time_to_duration(self.0.write)
681 }
682
683 #[inline]
686 pub const fn sync(&self) -> Duration {
687 Self::time_to_duration(self.0.sync)
688 }
689
690 #[inline]
692 pub const fn ending(&self) -> Duration {
693 Self::time_to_duration(self.0.ending)
694 }
695
696 #[inline]
698 pub const fn whole(&self) -> Duration {
699 Self::time_to_duration(self.0.whole)
700 }
701
702 #[inline]
704 pub const fn gc_cputime(&self) -> Duration {
705 Self::time_to_duration(self.0.gc_cputime)
706 }
707
708 #[inline]
709 const fn time_to_duration(time: u32) -> Duration {
710 Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
711 }
712}
713
714unsafe impl Send for TransactionPtr {}
716
717unsafe impl Sync for TransactionPtr {}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723
724 const fn assert_send_sync<T: Send + Sync>() {}
725
726 #[expect(dead_code)]
727 const fn test_txn_send_sync() {
728 assert_send_sync::<Transaction<RO>>();
729 assert_send_sync::<Transaction<RW>>();
730 }
731}