reth_libmdbx/
txn_manager.rs

1use crate::{
2    environment::EnvPtr,
3    error::{mdbx_result, Result},
4    CommitLatency,
5};
6use std::{
7    ptr,
8    sync::mpsc::{sync_channel, Receiver, SyncSender},
9};
10
11#[derive(Copy, Clone, Debug)]
12pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
13unsafe impl Send for TxnPtr {}
14unsafe impl Sync for TxnPtr {}
15
16pub(crate) enum TxnManagerMessage {
17    Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
18    Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
19    Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
20}
21
22/// Manages transactions by doing two things:
23/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
24///   corresponding [`TxnManagerMessage`]
25/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
26///   `TxnManager::with_max_read_transaction_duration` is called)
27#[derive(Debug)]
28pub(crate) struct TxnManager {
29    sender: SyncSender<TxnManagerMessage>,
30    #[cfg(feature = "read-tx-timeouts")]
31    read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
32}
33
34impl TxnManager {
35    pub(crate) fn new(env: EnvPtr) -> Self {
36        let (tx, rx) = sync_channel(0);
37        let txn_manager = Self {
38            sender: tx,
39            #[cfg(feature = "read-tx-timeouts")]
40            read_transactions: None,
41        };
42
43        txn_manager.start_message_listener(env, rx);
44
45        txn_manager
46    }
47
48    /// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
49    /// executes an FFI function, and returns the result on the provided channel.
50    ///
51    /// - [`TxnManagerMessage::Begin`] opens a new transaction with [`ffi::mdbx_txn_begin_ex`]
52    /// - [`TxnManagerMessage::Abort`] aborts a transaction with [`ffi::mdbx_txn_abort`]
53    /// - [`TxnManagerMessage::Commit`] commits a transaction with [`ffi::mdbx_txn_commit_ex`]
54    fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
55        let task = move || {
56            let env = env;
57            loop {
58                match rx.recv() {
59                    Ok(msg) => match msg {
60                        TxnManagerMessage::Begin { parent, flags, sender } => {
61                            let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
62                            let res = mdbx_result(unsafe {
63                                ffi::mdbx_txn_begin_ex(
64                                    env.0,
65                                    parent.0,
66                                    flags,
67                                    &mut txn,
68                                    ptr::null_mut(),
69                                )
70                            })
71                            .map(|_| TxnPtr(txn));
72                            sender.send(res).unwrap();
73                        }
74                        TxnManagerMessage::Abort { tx, sender } => {
75                            sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
76                        }
77                        TxnManagerMessage::Commit { tx, sender } => {
78                            sender
79                                .send({
80                                    let mut latency = CommitLatency::new();
81                                    mdbx_result(unsafe {
82                                        ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
83                                    })
84                                    .map(|v| (v, latency))
85                                })
86                                .unwrap();
87                        }
88                    },
89                    Err(_) => return,
90                }
91            }
92        };
93        std::thread::Builder::new().name("mdbx-rs-txn-manager".to_string()).spawn(task).unwrap();
94    }
95
96    pub(crate) fn send_message(&self, message: TxnManagerMessage) {
97        self.sender.send(message).unwrap()
98    }
99}
100
101#[cfg(feature = "read-tx-timeouts")]
102mod read_transactions {
103    use crate::{
104        environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
105        txn_manager::TxnManager,
106    };
107    use dashmap::{DashMap, DashSet};
108    use std::{
109        backtrace::Backtrace,
110        sync::{mpsc::sync_channel, Arc},
111        time::{Duration, Instant},
112    };
113    use tracing::{error, trace, warn};
114
115    const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
116
117    impl TxnManager {
118        /// Returns a new instance for which the maximum duration that a read transaction can be
119        /// open is set.
120        pub(crate) fn new_with_max_read_transaction_duration(
121            env: EnvPtr,
122            duration: Duration,
123        ) -> Self {
124            let read_transactions = Arc::new(ReadTransactions::new(duration));
125            read_transactions.clone().start_monitor();
126
127            let (tx, rx) = sync_channel(0);
128
129            let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
130
131            txn_manager.start_message_listener(env, rx);
132
133            txn_manager
134        }
135
136        /// Adds a new transaction to the list of active read transactions.
137        pub(crate) fn add_active_read_transaction(
138            &self,
139            ptr: *mut ffi::MDBX_txn,
140            tx: TransactionPtr,
141        ) {
142            if let Some(read_transactions) = &self.read_transactions {
143                read_transactions.add_active(ptr, tx);
144            }
145        }
146
147        /// Removes a transaction from the list of active read transactions.
148        ///
149        /// Returns `true` if the transaction was found and removed.
150        pub(crate) fn remove_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) -> bool {
151            self.read_transactions.as_ref().is_some_and(|txs| txs.remove_active(ptr))
152        }
153
154        /// Returns the number of timed out transactions that were not aborted by the user yet.
155        pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
156            self.read_transactions
157                .as_ref()
158                .map(|read_transactions| read_transactions.timed_out_not_aborted())
159        }
160    }
161
162    #[derive(Debug, Default)]
163    pub(super) struct ReadTransactions {
164        /// Maximum duration that a read transaction can be open until the
165        /// [`ReadTransactions::start_monitor`] aborts it.
166        max_duration: Duration,
167        /// List of currently active read transactions.
168        ///
169        /// We store `usize` instead of a raw pointer as a key, because pointers are not
170        /// comparable. The time of transaction opening is stored as a value.
171        ///
172        /// The backtrace of the transaction opening is recorded only when debug assertions are
173        /// enabled.
174        active: DashMap<usize, (TransactionPtr, Instant, Option<Arc<Backtrace>>)>,
175        /// List of timed out transactions that were not aborted by the user yet, hence have a
176        /// dangling read transaction pointer.
177        timed_out_not_aborted: DashSet<usize>,
178    }
179
180    impl ReadTransactions {
181        pub(super) fn new(max_duration: Duration) -> Self {
182            Self { max_duration, ..Default::default() }
183        }
184
185        /// Adds a new transaction to the list of active read transactions.
186        pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
187            let _ = self.active.insert(
188                ptr as usize,
189                (
190                    tx,
191                    Instant::now(),
192                    cfg!(debug_assertions).then(|| Arc::new(Backtrace::force_capture())),
193                ),
194            );
195        }
196
197        /// Removes a transaction from the list of active read transactions.
198        pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> bool {
199            self.timed_out_not_aborted.remove(&(ptr as usize));
200            self.active.remove(&(ptr as usize)).is_some()
201        }
202
203        /// Returns the number of timed out transactions that were not aborted by the user yet.
204        pub(super) fn timed_out_not_aborted(&self) -> usize {
205            self.timed_out_not_aborted.len()
206        }
207
208        /// Spawns a new [`std::thread`] that monitors the list of active read transactions and
209        /// timeouts those that are open for longer than `ReadTransactions.max_duration`.
210        pub(super) fn start_monitor(self: Arc<Self>) {
211            let task = move || {
212                let mut timed_out_active = Vec::new();
213
214                loop {
215                    let now = Instant::now();
216                    let mut max_active_transaction_duration = None;
217
218                    // Iterate through active read transactions and time out those that's open for
219                    // longer than `self.max_duration`.
220                    for entry in &self.active {
221                        let (tx, start, backtrace) = entry.value();
222                        let duration = now - *start;
223
224                        if duration > self.max_duration {
225                            let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
226                                // Time out the transaction.
227                                //
228                                // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
229                                // prevent MDBX from reusing the pointer of the aborted
230                                // transaction for new read-only transactions. This is
231                                // important because we store the pointer in the `active` list
232                                // and assume that it is unique.
233                                //
234                                // See https://libmdbx.dqdkfa.ru/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
235                                let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
236                                if result.is_ok() {
237                                    tx.set_timed_out();
238                                }
239                                (txn_ptr, duration, result)
240                            });
241
242                            match result {
243                                Ok((txn_ptr, duration, error)) => {
244                                    // Add the transaction to `timed_out_active`. We can't remove it
245                                    // instantly from the list of active transactions, because we
246                                    // iterate through it.
247                                    timed_out_active.push((
248                                        txn_ptr,
249                                        duration,
250                                        backtrace.clone(),
251                                        error,
252                                    ));
253                                }
254                                Err(err) => {
255                                    error!(target: "libmdbx", %err, ?backtrace, "Failed to abort the long-lived read transaction")
256                                }
257                            }
258                        } else {
259                            max_active_transaction_duration = Some(
260                                duration.max(max_active_transaction_duration.unwrap_or_default()),
261                            );
262                        }
263                    }
264
265                    // Walk through timed out transactions, and delete them from the list of active
266                    // transactions.
267                    for (ptr, open_duration, backtrace, err) in timed_out_active.iter().cloned() {
268                        // Try deleting the transaction from the list of active transactions.
269                        let was_in_active = self.remove_active(ptr);
270                        if let Err(err) = err {
271                            if was_in_active {
272                                // If the transaction was in the list of active transactions,
273                                // then user didn't abort it and we failed to do so.
274                                error!(target: "libmdbx", %err, ?open_duration, ?backtrace, "Failed to time out the long-lived read transaction");
275                            }
276                        } else {
277                            // Happy path, the transaction has been timed out by us with no errors.
278                            warn!(target: "libmdbx", ?open_duration, ?backtrace, "Long-lived read transaction has been timed out");
279                            // Add transaction to the list of timed out transactions that were not
280                            // aborted by the user yet.
281                            self.timed_out_not_aborted.insert(ptr as usize);
282                        }
283                    }
284
285                    // Clear the list of timed out transactions, but not de-allocate the reserved
286                    // capacity to save on further pushes.
287                    timed_out_active.clear();
288
289                    if !self.active.is_empty() {
290                        trace!(
291                            target: "libmdbx",
292                            elapsed = ?now.elapsed(),
293                            active = ?self.active.iter().map(|entry| {
294                                let (tx, start, _) = entry.value();
295                                (tx.clone(), start.elapsed())
296                            }).collect::<Vec<_>>(),
297                            "Read transactions"
298                        );
299                    }
300
301                    // Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
302                    // the closest deadline of an active read transaction
303                    let sleep_duration = READ_TRANSACTIONS_CHECK_INTERVAL.min(
304                        self.max_duration - max_active_transaction_duration.unwrap_or_default(),
305                    );
306                    trace!(target: "libmdbx", ?sleep_duration, elapsed = ?now.elapsed(), "Putting transaction monitor to sleep");
307                    std::thread::sleep(sleep_duration);
308                }
309            };
310            std::thread::Builder::new()
311                .name("mdbx-rs-read-tx-timeouts".to_string())
312                .spawn(task)
313                .unwrap();
314        }
315    }
316
317    #[cfg(test)]
318    mod tests {
319        use crate::{
320            txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
321            MaxReadTransactionDuration,
322        };
323        use std::{thread::sleep, time::Duration};
324        use tempfile::tempdir;
325
326        #[test]
327        fn txn_manager_read_transactions_duration_set() {
328            const MAX_DURATION: Duration = Duration::from_secs(1);
329
330            let dir = tempdir().unwrap();
331            let env = Environment::builder()
332                .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
333                .open(dir.path())
334                .unwrap();
335
336            let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
337
338            // Create a read-only transaction, successfully use it, close it by dropping.
339            {
340                let tx = env.begin_ro_txn().unwrap();
341                let tx_ptr = tx.txn() as usize;
342                assert!(read_transactions.active.contains_key(&tx_ptr));
343
344                tx.open_db(None).unwrap();
345                drop(tx);
346
347                assert!(!read_transactions.active.contains_key(&tx_ptr));
348            }
349
350            // Create a read-only transaction, successfully use it, close it by committing.
351            {
352                let tx = env.begin_ro_txn().unwrap();
353                let tx_ptr = tx.txn() as usize;
354                assert!(read_transactions.active.contains_key(&tx_ptr));
355
356                tx.open_db(None).unwrap();
357                tx.commit().unwrap();
358
359                assert!(!read_transactions.active.contains_key(&tx_ptr));
360            }
361
362            {
363                // Create a read-only transaction and observe it's in the list of active
364                // transactions.
365                let tx = env.begin_ro_txn().unwrap();
366                let tx_ptr = tx.txn() as usize;
367                assert!(read_transactions.active.contains_key(&tx_ptr));
368
369                // Wait until the transaction is timed out by the manager.
370                sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
371
372                // Ensure that the transaction is not in the list of active transactions anymore,
373                // and is in the list of timed out but not aborted transactions.
374                assert!(!read_transactions.active.contains_key(&tx_ptr));
375                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
376
377                // Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
378                assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
379                assert!(!read_transactions.active.contains_key(&tx_ptr));
380                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
381
382                assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
383                assert!(!read_transactions.active.contains_key(&tx_ptr));
384                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
385
386                // Ensure that the transaction pointer is not reused when opening a new read-only
387                // transaction.
388                let new_tx = env.begin_ro_txn().unwrap();
389                let new_tx_ptr = new_tx.txn() as usize;
390                assert!(read_transactions.active.contains_key(&new_tx_ptr));
391                assert_ne!(tx_ptr, new_tx_ptr);
392
393                // Drop the transaction and ensure that it's not in the list of timed out but not
394                // aborted transactions anymore.
395                drop(tx);
396                assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
397            }
398        }
399
400        #[test]
401        fn txn_manager_read_transactions_duration_unbounded() {
402            let dir = tempdir().unwrap();
403            let env = Environment::builder()
404                .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
405                .open(dir.path())
406                .unwrap();
407
408            assert!(env.txn_manager().read_transactions.is_none());
409
410            let tx = env.begin_ro_txn().unwrap();
411            sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
412            assert!(tx.commit().is_ok())
413        }
414    }
415}