1use crate::{
2 environment::EnvPtr,
3 error::{mdbx_result, Result},
4 CommitLatency,
5};
6use std::{
7 fmt, ptr,
8 sync::mpsc::{sync_channel, Receiver, SyncSender},
9};
10
11#[derive(Copy, Clone, Debug)]
12pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
13unsafe impl Send for TxnPtr {}
14unsafe impl Sync for TxnPtr {}
15
16pub(crate) enum TxnManagerMessage {
17 Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
18 Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
19 Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
20}
21
22impl fmt::Debug for TxnManagerMessage {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 match self {
25 Self::Begin { parent, flags, sender: _ } => {
26 f.debug_struct("Begin").field("parent", parent).field("flags", flags).finish()
27 }
28 Self::Abort { tx, sender: _ } => f.debug_struct("Abort").field("tx", tx).finish(),
29 Self::Commit { tx, sender: _ } => f.debug_struct("Commit").field("tx", tx).finish(),
30 }
31 }
32}
33
34#[derive(Debug)]
40pub(crate) struct TxnManager {
41 sender: SyncSender<TxnManagerMessage>,
42 #[cfg(feature = "read-tx-timeouts")]
43 read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
44}
45
46impl TxnManager {
47 pub(crate) fn new(env: EnvPtr) -> Self {
48 let (tx, rx) = sync_channel(0);
49 let txn_manager = Self {
50 sender: tx,
51 #[cfg(feature = "read-tx-timeouts")]
52 read_transactions: None,
53 };
54
55 txn_manager.start_message_listener(env, rx);
56
57 txn_manager
58 }
59
60 fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
67 let task = move || {
68 let env = env;
69 loop {
70 let msg = rx.recv();
71 tracing::debug!(target: "libmdbx::txn", ?msg, "txn-mngr received");
72 match msg {
73 Ok(msg) => match msg {
74 TxnManagerMessage::Begin { parent, flags, sender } => {
75 let _span =
76 tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
77 .entered();
78 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
79 let res = mdbx_result(unsafe {
80 ffi::mdbx_txn_begin_ex(
81 env.0,
82 parent.0,
83 flags,
84 &mut txn,
85 ptr::null_mut(),
86 )
87 })
88 .map(|_| TxnPtr(txn));
89 sender.send(res).unwrap();
90 }
91 TxnManagerMessage::Abort { tx, sender } => {
92 let _span =
93 tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
94 sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
95 }
96 TxnManagerMessage::Commit { tx, sender } => {
97 let _span =
98 tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
99 sender
100 .send({
101 let mut latency = CommitLatency::new();
102 mdbx_result(unsafe {
103 ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
104 })
105 .map(|v| (v, latency))
106 })
107 .unwrap();
108 }
109 },
110 Err(_) => return,
111 }
112 }
113 };
114 std::thread::Builder::new().name("mdbx-rs-txn-mgr".to_string()).spawn(task).unwrap();
115 }
116
117 pub(crate) fn send_message(&self, message: TxnManagerMessage) {
118 self.sender.send(message).unwrap()
119 }
120}
121
122#[cfg(feature = "read-tx-timeouts")]
123mod read_transactions {
124 use crate::{
125 environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
126 txn_manager::TxnManager,
127 };
128 use dashmap::{DashMap, DashSet};
129 use std::{
130 backtrace::Backtrace,
131 sync::{mpsc::sync_channel, Arc},
132 time::{Duration, Instant},
133 };
134 use tracing::{error, trace, warn};
135
136 const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
137
138 impl TxnManager {
139 pub(crate) fn new_with_max_read_transaction_duration(
142 env: EnvPtr,
143 duration: Duration,
144 ) -> Self {
145 let read_transactions = Arc::new(ReadTransactions::new(duration));
146 read_transactions.clone().start_monitor();
147
148 let (tx, rx) = sync_channel(0);
149
150 let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
151
152 txn_manager.start_message_listener(env, rx);
153
154 txn_manager
155 }
156
157 pub(crate) fn add_active_read_transaction(
159 &self,
160 ptr: *mut ffi::MDBX_txn,
161 tx: TransactionPtr,
162 ) {
163 if let Some(read_transactions) = &self.read_transactions {
164 read_transactions.add_active(ptr, tx);
165 }
166 }
167
168 pub(crate) fn remove_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) -> bool {
172 self.read_transactions.as_ref().is_some_and(|txs| txs.remove_active(ptr))
173 }
174
175 pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
177 self.read_transactions
178 .as_ref()
179 .map(|read_transactions| read_transactions.timed_out_not_aborted())
180 }
181 }
182
183 #[derive(Debug, Default)]
184 pub(super) struct ReadTransactions {
185 max_duration: Duration,
188 active: DashMap<usize, (TransactionPtr, Instant, Option<Arc<Backtrace>>)>,
196 timed_out_not_aborted: DashSet<usize>,
199 }
200
201 impl ReadTransactions {
202 pub(super) fn new(max_duration: Duration) -> Self {
203 Self { max_duration, ..Default::default() }
204 }
205
206 pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
208 let _ = self.active.insert(
209 ptr as usize,
210 (
211 tx,
212 Instant::now(),
213 cfg!(debug_assertions).then(|| Arc::new(Backtrace::force_capture())),
214 ),
215 );
216 }
217
218 pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> bool {
220 self.timed_out_not_aborted.remove(&(ptr as usize));
221 self.active.remove(&(ptr as usize)).is_some()
222 }
223
224 pub(super) fn timed_out_not_aborted(&self) -> usize {
226 self.timed_out_not_aborted.len()
227 }
228
229 pub(super) fn start_monitor(self: Arc<Self>) {
232 let task = move || {
233 let mut timed_out_active = Vec::new();
234
235 loop {
236 let now = Instant::now();
237 let mut max_active_transaction_duration = None;
238
239 for entry in &self.active {
242 let (tx, start, backtrace) = entry.value();
243 let duration = now - *start;
244
245 if duration > self.max_duration {
246 let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
247 let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
257 if result.is_ok() {
258 tx.set_timed_out();
259 }
260 (txn_ptr, duration, result)
261 });
262
263 match result {
264 Ok((txn_ptr, duration, error)) => {
265 timed_out_active.push((
269 txn_ptr,
270 duration,
271 backtrace.clone(),
272 error,
273 ));
274 }
275 Err(err) => {
276 error!(target: "libmdbx", %err, ?backtrace, "Failed to abort the long-lived read transaction")
277 }
278 }
279 } else {
280 max_active_transaction_duration = Some(
281 duration.max(max_active_transaction_duration.unwrap_or_default()),
282 );
283 }
284 }
285
286 for (ptr, open_duration, backtrace, err) in timed_out_active.iter().cloned() {
289 let was_in_active = self.remove_active(ptr);
291 if let Err(err) = err {
292 if was_in_active {
293 error!(target: "libmdbx", %err, ?open_duration, ?backtrace, "Failed to time out the long-lived read transaction");
296 }
297 } else {
298 warn!(target: "libmdbx", ?open_duration, ?backtrace, "Long-lived read transaction has been timed out");
300 self.timed_out_not_aborted.insert(ptr as usize);
303 }
304 }
305
306 timed_out_active.clear();
309
310 if !self.active.is_empty() {
311 trace!(
312 target: "libmdbx",
313 elapsed = ?now.elapsed(),
314 active = ?self.active.iter().map(|entry| {
315 let (tx, start, _) = entry.value();
316 (tx.clone(), start.elapsed())
317 }).collect::<Vec<_>>(),
318 "Read transactions"
319 );
320 }
321
322 let sleep_duration = READ_TRANSACTIONS_CHECK_INTERVAL.min(
325 self.max_duration - max_active_transaction_duration.unwrap_or_default(),
326 );
327 trace!(target: "libmdbx", ?sleep_duration, elapsed = ?now.elapsed(), "Putting transaction monitor to sleep");
328 std::thread::sleep(sleep_duration);
329 }
330 };
331 std::thread::Builder::new()
332 .name("mdbx-rs-read-tx-timeouts".to_string())
333 .spawn(task)
334 .unwrap();
335 }
336 }
337
338 #[cfg(test)]
339 mod tests {
340 use crate::{
341 txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
342 MaxReadTransactionDuration,
343 };
344 use std::{thread::sleep, time::Duration};
345 use tempfile::tempdir;
346
347 #[test]
348 fn txn_manager_read_transactions_duration_set() {
349 const MAX_DURATION: Duration = Duration::from_secs(1);
350
351 let dir = tempdir().unwrap();
352 let env = Environment::builder()
353 .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
354 .open(dir.path())
355 .unwrap();
356
357 let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
358
359 {
361 let tx = env.begin_ro_txn().unwrap();
362 let tx_ptr = tx.txn() as usize;
363 assert!(read_transactions.active.contains_key(&tx_ptr));
364
365 tx.open_db(None).unwrap();
366 drop(tx);
367
368 assert!(!read_transactions.active.contains_key(&tx_ptr));
369 }
370
371 {
373 let tx = env.begin_ro_txn().unwrap();
374 let tx_ptr = tx.txn() as usize;
375 assert!(read_transactions.active.contains_key(&tx_ptr));
376
377 tx.open_db(None).unwrap();
378 tx.commit().unwrap();
379
380 assert!(!read_transactions.active.contains_key(&tx_ptr));
381 }
382
383 {
384 let tx = env.begin_ro_txn().unwrap();
387 let tx_ptr = tx.txn() as usize;
388 assert!(read_transactions.active.contains_key(&tx_ptr));
389
390 sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
392
393 assert!(!read_transactions.active.contains_key(&tx_ptr));
396 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
397
398 assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
400 assert!(!read_transactions.active.contains_key(&tx_ptr));
401 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
402
403 assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
404 assert!(!read_transactions.active.contains_key(&tx_ptr));
405 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
406
407 let new_tx = env.begin_ro_txn().unwrap();
410 let new_tx_ptr = new_tx.txn() as usize;
411 assert!(read_transactions.active.contains_key(&new_tx_ptr));
412 assert_ne!(tx_ptr, new_tx_ptr);
413
414 drop(tx);
417 assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
418 }
419 }
420
421 #[test]
422 fn txn_manager_read_transactions_duration_unbounded() {
423 let dir = tempdir().unwrap();
424 let env = Environment::builder()
425 .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
426 .open(dir.path())
427 .unwrap();
428
429 assert!(env.txn_manager().read_transactions.is_none());
430
431 let tx = env.begin_ro_txn().unwrap();
432 sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
433 assert!(tx.commit().is_ok())
434 }
435 }
436}