1use crate::{
2 database::Database,
3 environment::Environment,
4 error::{mdbx_result, Result},
5 flags::{DatabaseFlags, WriteFlags},
6 txn_manager::{TxnManagerMessage, TxnPtr},
7 Cursor, Error, Stat, TableObject,
8};
9use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
10use parking_lot::{Mutex, MutexGuard};
11use std::{
12 ffi::{c_uint, c_void},
13 fmt::{self, Debug},
14 mem::size_of,
15 ptr, slice,
16 sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
17 time::Duration,
18};
19
20#[cfg(feature = "read-tx-timeouts")]
21use ffi::mdbx_txn_renew;
22
23mod private {
24 use super::*;
25
26 pub trait Sealed {}
27
28 impl Sealed for RO {}
29 impl Sealed for RW {}
30}
31
32pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static {
33 #[doc(hidden)]
34 const OPEN_FLAGS: MDBX_txn_flags_t;
35
36 #[doc(hidden)]
38 const IS_READ_ONLY: bool;
39}
40
41#[derive(Debug)]
42#[non_exhaustive]
43pub struct RO;
44
45#[derive(Debug)]
46#[non_exhaustive]
47pub struct RW;
48
49impl TransactionKind for RO {
50 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY;
51 const IS_READ_ONLY: bool = true;
52}
53impl TransactionKind for RW {
54 const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE;
55 const IS_READ_ONLY: bool = false;
56}
57
58pub struct Transaction<K>
62where
63 K: TransactionKind,
64{
65 inner: Arc<TransactionInner<K>>,
66}
67
68impl<K> Transaction<K>
69where
70 K: TransactionKind,
71{
72 pub(crate) fn new(env: Environment) -> Result<Self> {
73 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
74 unsafe {
75 mdbx_result(ffi::mdbx_txn_begin_ex(
76 env.env_ptr(),
77 ptr::null_mut(),
78 K::OPEN_FLAGS,
79 &mut txn,
80 ptr::null_mut(),
81 ))?;
82 Ok(Self::new_from_ptr(env, txn))
83 }
84 }
85
86 pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
87 let txn = TransactionPtr::new(txn_ptr);
88
89 #[cfg(feature = "read-tx-timeouts")]
90 if K::IS_READ_ONLY {
91 env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
92 }
93
94 let inner = TransactionInner {
95 txn,
96 committed: AtomicBool::new(false),
97 env,
98 _marker: Default::default(),
99 };
100
101 Self { inner: Arc::new(inner) }
102 }
103
104 #[inline]
109 pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
110 where
111 F: FnOnce(*mut ffi::MDBX_txn) -> T,
112 {
113 self.inner.txn_execute(f)
114 }
115
116 #[inline]
121 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
122 where
123 F: FnOnce(*mut ffi::MDBX_txn) -> T,
124 {
125 self.inner.txn_execute_renew_on_timeout(f)
126 }
127
128 #[doc(hidden)]
130 #[cfg(test)]
131 pub fn txn(&self) -> *mut ffi::MDBX_txn {
132 self.inner.txn.txn
133 }
134
135 pub fn env(&self) -> &Environment {
137 &self.inner.env
138 }
139
140 pub fn id(&self) -> Result<u64> {
142 self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
143 }
144
145 pub fn get<Key>(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> Result<Option<Key>>
154 where
155 Key: TableObject,
156 {
157 let key_val: ffi::MDBX_val =
158 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
159 let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
160
161 self.txn_execute(|txn| unsafe {
162 match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
163 ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, data_val).map(Some),
164 ffi::MDBX_NOTFOUND => Ok(None),
165 err_code => Err(Error::from_err_code(err_code)),
166 }
167 })?
168 }
169
170 pub fn commit(self) -> Result<CommitLatency> {
174 match self.txn_execute(|txn| {
175 if K::IS_READ_ONLY {
176 #[cfg(feature = "read-tx-timeouts")]
177 self.env().txn_manager().remove_active_read_transaction(txn);
178
179 let mut latency = CommitLatency::new();
180 mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) })
181 .map(|v| (v, latency))
182 } else {
183 let (sender, rx) = sync_channel(0);
184 self.env()
185 .txn_manager()
186 .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
187 rx.recv().unwrap()
188 }
189 })? {
190 Ok((false, lat)) => {
192 self.inner.set_committed();
193 Ok(lat)
194 }
195 Ok((true, _)) => {
196 self.inner.set_committed();
200 Err(Error::BotchedTransaction)
201 }
202 Err(e) => Err(e),
203 }
204 }
205
206 pub fn open_db(&self, name: Option<&str>) -> Result<Database> {
218 Database::new(self, name, 0)
219 }
220
221 pub fn db_flags(&self, dbi: ffi::MDBX_dbi) -> Result<DatabaseFlags> {
223 let mut flags: c_uint = 0;
224 unsafe {
225 self.txn_execute(|txn| {
226 let mut _status: c_uint = 0;
230 mdbx_result(ffi::mdbx_dbi_flags_ex(txn, dbi, &mut flags, &mut _status))
231 })??;
232 }
233
234 #[cfg_attr(not(windows), allow(clippy::useless_conversion))]
236 Ok(DatabaseFlags::from_bits_truncate(flags.try_into().unwrap()))
237 }
238
239 pub fn db_stat(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
241 self.db_stat_with_dbi(dbi)
242 }
243
244 pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
246 unsafe {
247 let mut stat = Stat::new();
248 self.txn_execute(|txn| {
249 mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
250 })??;
251 Ok(stat)
252 }
253 }
254
255 pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
257 Cursor::new(self.clone(), dbi)
258 }
259
260 pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
262 Cursor::new(self.clone(), dbi)
263 }
264
265 #[cfg(feature = "read-tx-timeouts")]
267 pub fn disable_timeout(&self) {
268 if K::IS_READ_ONLY {
269 self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
270 }
271 }
272}
273
274impl<K> Clone for Transaction<K>
275where
276 K: TransactionKind,
277{
278 fn clone(&self) -> Self {
279 Self { inner: Arc::clone(&self.inner) }
280 }
281}
282
283impl<K> fmt::Debug for Transaction<K>
284where
285 K: TransactionKind,
286{
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 f.debug_struct("RoTransaction").finish_non_exhaustive()
289 }
290}
291
292struct TransactionInner<K>
294where
295 K: TransactionKind,
296{
297 txn: TransactionPtr,
299 committed: AtomicBool,
301 env: Environment,
302 _marker: std::marker::PhantomData<fn(K)>,
303}
304
305impl<K> TransactionInner<K>
306where
307 K: TransactionKind,
308{
309 fn set_committed(&self) {
311 self.committed.store(true, std::sync::atomic::Ordering::SeqCst);
312 }
313
314 fn has_committed(&self) -> bool {
315 self.committed.load(std::sync::atomic::Ordering::SeqCst)
316 }
317
318 #[inline]
319 fn txn_execute<F, T>(&self, f: F) -> Result<T>
320 where
321 F: FnOnce(*mut ffi::MDBX_txn) -> T,
322 {
323 self.txn.txn_execute_fail_on_timeout(f)
324 }
325
326 #[inline]
327 fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
328 where
329 F: FnOnce(*mut ffi::MDBX_txn) -> T,
330 {
331 self.txn.txn_execute_renew_on_timeout(f)
332 }
333}
334
335impl<K> Drop for TransactionInner<K>
336where
337 K: TransactionKind,
338{
339 fn drop(&mut self) {
340 let _ = self.txn.txn_execute_renew_on_timeout(|txn| {
348 if !self.has_committed() {
349 if K::IS_READ_ONLY {
350 #[cfg(feature = "read-tx-timeouts")]
351 self.env.txn_manager().remove_active_read_transaction(txn);
352
353 unsafe {
354 ffi::mdbx_txn_abort(txn);
355 }
356 } else {
357 let (sender, rx) = sync_channel(0);
358 self.env
359 .txn_manager()
360 .send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
361 if let Ok(Err(e)) = rx.recv() {
362 tracing::error!(target: "libmdbx", %e, "failed to abort transaction in drop");
363 }
364 }
365 }
366 });
367 }
368}
369
370impl Transaction<RW> {
371 fn open_db_with_flags(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
372 Database::new(self, name, flags.bits())
373 }
374
375 pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
388 self.open_db_with_flags(name, flags | DatabaseFlags::CREATE)
389 }
390
391 pub fn put(
398 &self,
399 dbi: ffi::MDBX_dbi,
400 key: impl AsRef<[u8]>,
401 data: impl AsRef<[u8]>,
402 flags: WriteFlags,
403 ) -> Result<()> {
404 let key = key.as_ref();
405 let data = data.as_ref();
406 let key_val: ffi::MDBX_val =
407 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
408 let mut data_val: ffi::MDBX_val =
409 ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
410 mdbx_result(self.txn_execute(|txn| unsafe {
411 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
412 })?)?;
413
414 Ok(())
415 }
416
417 #[allow(clippy::mut_from_ref)]
429 pub unsafe fn reserve(
430 &self,
431 dbi: ffi::MDBX_dbi,
432 key: impl AsRef<[u8]>,
433 len: usize,
434 flags: WriteFlags,
435 ) -> Result<&mut [u8]> {
436 let key = key.as_ref();
437 let key_val: ffi::MDBX_val =
438 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
439 let mut data_val: ffi::MDBX_val =
440 ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
441 unsafe {
442 mdbx_result(self.txn_execute(|txn| {
443 ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits() | ffi::MDBX_RESERVE)
444 })?)?;
445 Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
446 }
447 }
448
449 pub fn del(
459 &self,
460 dbi: ffi::MDBX_dbi,
461 key: impl AsRef<[u8]>,
462 data: Option<&[u8]>,
463 ) -> Result<bool> {
464 let key = key.as_ref();
465 let key_val: ffi::MDBX_val =
466 ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
467 let data_val: Option<ffi::MDBX_val> = data.map(|data| ffi::MDBX_val {
468 iov_len: data.len(),
469 iov_base: data.as_ptr() as *mut c_void,
470 });
471
472 mdbx_result({
473 self.txn_execute(|txn| {
474 if let Some(d) = data_val {
475 unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
476 } else {
477 unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
478 }
479 })?
480 })
481 .map(|_| true)
482 .or_else(|e| match e {
483 Error::NotFound => Ok(false),
484 other => Err(other),
485 })
486 }
487
488 pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
490 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
491
492 Ok(())
493 }
494
495 pub unsafe fn drop_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
501 mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, true) })?)?;
502
503 Ok(())
504 }
505}
506
507impl Transaction<RO> {
508 pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
514 mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), dbi) })?;
515
516 Ok(())
517 }
518}
519
520impl Transaction<RW> {
521 pub fn begin_nested_txn(&mut self) -> Result<Self> {
523 if self.inner.env.is_write_map() {
524 return Err(Error::NestedTransactionsUnsupportedWithWriteMap)
525 }
526 self.txn_execute(|txn| {
527 let (tx, rx) = sync_channel(0);
528 self.env().txn_manager().send_message(TxnManagerMessage::Begin {
529 parent: TxnPtr(txn),
530 flags: RW::OPEN_FLAGS,
531 sender: tx,
532 });
533
534 rx.recv().unwrap().map(|ptr| Self::new_from_ptr(self.env().clone(), ptr.0))
535 })?
536 }
537}
538
539#[derive(Debug, Clone)]
541pub(crate) struct TransactionPtr {
542 txn: *mut ffi::MDBX_txn,
543 #[cfg(feature = "read-tx-timeouts")]
544 timed_out: Arc<AtomicBool>,
545 lock: Arc<Mutex<()>>,
546}
547
548impl TransactionPtr {
549 fn new(txn: *mut ffi::MDBX_txn) -> Self {
550 Self {
551 txn,
552 #[cfg(feature = "read-tx-timeouts")]
553 timed_out: Arc::new(AtomicBool::new(false)),
554 lock: Arc::new(Mutex::new(())),
555 }
556 }
557
558 #[cfg(feature = "read-tx-timeouts")]
567 fn is_timed_out(&self) -> bool {
568 self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
569 }
570
571 #[cfg(feature = "read-tx-timeouts")]
572 pub(crate) fn set_timed_out(&self) {
573 self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst);
574 }
575
576 fn lock(&self) -> MutexGuard<'_, ()> {
579 if let Some(lock) = self.lock.try_lock() {
580 lock
581 } else {
582 tracing::trace!(
583 target: "libmdbx",
584 txn = %self.txn as usize,
585 backtrace = %std::backtrace::Backtrace::capture(),
586 "Transaction lock is already acquired, blocking...
587 To display the full backtrace, run with `RUST_BACKTRACE=full` env variable."
588 );
589 self.lock.lock()
590 }
591 }
592
593 #[inline]
597 pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
598 where
599 F: FnOnce(*mut ffi::MDBX_txn) -> T,
600 {
601 let _lck = self.lock();
602
603 #[cfg(feature = "read-tx-timeouts")]
607 if self.is_timed_out() {
608 return Err(Error::ReadTransactionTimeout)
609 }
610
611 Ok((f)(self.txn))
612 }
613
614 #[inline]
619 pub(crate) fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
620 where
621 F: FnOnce(*mut ffi::MDBX_txn) -> T,
622 {
623 let _lck = self.lock();
624
625 #[cfg(feature = "read-tx-timeouts")]
627 if self.is_timed_out() {
628 mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
629 }
630
631 Ok((f)(self.txn))
632 }
633}
634
635#[derive(Debug)]
640#[repr(transparent)]
641pub struct CommitLatency(ffi::MDBX_commit_latency);
642
643impl CommitLatency {
644 pub(crate) const fn new() -> Self {
646 unsafe { Self(std::mem::zeroed()) }
647 }
648
649 pub(crate) const fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
651 &mut self.0
652 }
653}
654
655impl CommitLatency {
656 #[inline]
659 pub const fn preparation(&self) -> Duration {
660 Self::time_to_duration(self.0.preparation)
661 }
662
663 #[inline]
665 pub const fn gc_wallclock(&self) -> Duration {
666 Self::time_to_duration(self.0.gc_wallclock)
667 }
668
669 #[inline]
671 pub const fn audit(&self) -> Duration {
672 Self::time_to_duration(self.0.audit)
673 }
674
675 #[inline]
678 pub const fn write(&self) -> Duration {
679 Self::time_to_duration(self.0.write)
680 }
681
682 #[inline]
685 pub const fn sync(&self) -> Duration {
686 Self::time_to_duration(self.0.sync)
687 }
688
689 #[inline]
691 pub const fn ending(&self) -> Duration {
692 Self::time_to_duration(self.0.ending)
693 }
694
695 #[inline]
697 pub const fn whole(&self) -> Duration {
698 Self::time_to_duration(self.0.whole)
699 }
700
701 #[inline]
703 pub const fn gc_cputime(&self) -> Duration {
704 Self::time_to_duration(self.0.gc_cputime)
705 }
706
707 #[inline]
708 const fn time_to_duration(time: u32) -> Duration {
709 Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
710 }
711}
712
713unsafe impl Send for TransactionPtr {}
715
716unsafe impl Sync for TransactionPtr {}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 const fn assert_send_sync<T: Send + Sync>() {}
724
725 #[expect(dead_code)]
726 const fn test_txn_send_sync() {
727 assert_send_sync::<Transaction<RO>>();
728 assert_send_sync::<Transaction<RW>>();
729 }
730}