Skip to main content

reth_libmdbx/
txn_manager.rs

1use crate::{
2    environment::EnvPtr,
3    error::{mdbx_result, Result},
4    CommitLatency,
5};
6use std::{
7    fmt, ptr,
8    sync::mpsc::{sync_channel, Receiver, SyncSender},
9};
10
11#[derive(Copy, Clone, Debug)]
12pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
13unsafe impl Send for TxnPtr {}
14unsafe impl Sync for TxnPtr {}
15
16pub(crate) enum TxnManagerMessage {
17    Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
18    Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
19    Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
20}
21
22impl fmt::Debug for TxnManagerMessage {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        match self {
25            Self::Begin { parent, flags, sender: _ } => {
26                f.debug_struct("Begin").field("parent", parent).field("flags", flags).finish()
27            }
28            Self::Abort { tx, sender: _ } => f.debug_struct("Abort").field("tx", tx).finish(),
29            Self::Commit { tx, sender: _ } => f.debug_struct("Commit").field("tx", tx).finish(),
30        }
31    }
32}
33
34/// Manages transactions by doing two things:
35/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
36///   corresponding [`TxnManagerMessage`]
37/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
38///   `TxnManager::with_max_read_transaction_duration` is called)
39#[derive(Debug)]
40pub(crate) struct TxnManager {
41    sender: SyncSender<TxnManagerMessage>,
42    #[cfg(feature = "read-tx-timeouts")]
43    read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
44}
45
46impl TxnManager {
47    pub(crate) fn new(env: EnvPtr) -> Self {
48        let (tx, rx) = sync_channel(0);
49        let txn_manager = Self {
50            sender: tx,
51            #[cfg(feature = "read-tx-timeouts")]
52            read_transactions: None,
53        };
54
55        txn_manager.start_message_listener(env, rx);
56
57        txn_manager
58    }
59
60    /// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
61    /// executes an FFI function, and returns the result on the provided channel.
62    ///
63    /// - [`TxnManagerMessage::Begin`] opens a new transaction with [`ffi::mdbx_txn_begin_ex`]
64    /// - [`TxnManagerMessage::Abort`] aborts a transaction with [`ffi::mdbx_txn_abort`]
65    /// - [`TxnManagerMessage::Commit`] commits a transaction with [`ffi::mdbx_txn_commit_ex`]
66    fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
67        let task = move || {
68            let env = env;
69            loop {
70                let msg = rx.recv();
71                tracing::debug!(target: "libmdbx::txn", ?msg, "txn-mngr received");
72                match msg {
73                    Ok(msg) => match msg {
74                        TxnManagerMessage::Begin { parent, flags, sender } => {
75                            let _span =
76                                tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
77                                    .entered();
78                            let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
79                            let res = mdbx_result(unsafe {
80                                ffi::mdbx_txn_begin_ex(
81                                    env.0,
82                                    parent.0,
83                                    flags,
84                                    &mut txn,
85                                    ptr::null_mut(),
86                                )
87                            })
88                            .map(|_| TxnPtr(txn));
89                            sender.send(res).unwrap();
90                        }
91                        TxnManagerMessage::Abort { tx, sender } => {
92                            let _span =
93                                tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
94                            sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
95                        }
96                        TxnManagerMessage::Commit { tx, sender } => {
97                            let _span =
98                                tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
99                            sender
100                                .send({
101                                    let mut latency = CommitLatency::new();
102                                    mdbx_result(unsafe {
103                                        ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
104                                    })
105                                    .map(|v| (v, latency))
106                                })
107                                .unwrap();
108                        }
109                    },
110                    Err(_) => return,
111                }
112            }
113        };
114        std::thread::Builder::new().name("mdbx-rs-txn-mgr".to_string()).spawn(task).unwrap();
115    }
116
117    pub(crate) fn send_message(&self, message: TxnManagerMessage) {
118        self.sender.send(message).unwrap()
119    }
120}
121
122#[cfg(feature = "read-tx-timeouts")]
123mod read_transactions {
124    use crate::{
125        environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
126        txn_manager::TxnManager,
127    };
128    use dashmap::{DashMap, DashSet};
129    use std::{
130        backtrace::Backtrace,
131        sync::{mpsc::sync_channel, Arc},
132        time::{Duration, Instant},
133    };
134    use tracing::{error, trace, warn};
135
136    const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
137
138    impl TxnManager {
139        /// Returns a new instance for which the maximum duration that a read transaction can be
140        /// open is set.
141        pub(crate) fn new_with_max_read_transaction_duration(
142            env: EnvPtr,
143            duration: Duration,
144        ) -> Self {
145            let read_transactions = Arc::new(ReadTransactions::new(duration));
146            read_transactions.clone().start_monitor();
147
148            let (tx, rx) = sync_channel(0);
149
150            let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
151
152            txn_manager.start_message_listener(env, rx);
153
154            txn_manager
155        }
156
157        /// Adds a new transaction to the list of active read transactions.
158        pub(crate) fn add_active_read_transaction(
159            &self,
160            ptr: *mut ffi::MDBX_txn,
161            tx: TransactionPtr,
162        ) {
163            if let Some(read_transactions) = &self.read_transactions {
164                read_transactions.add_active(ptr, tx);
165            }
166        }
167
168        /// Removes a transaction from the list of active read transactions.
169        ///
170        /// Returns `true` if the transaction was found and removed.
171        pub(crate) fn remove_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) -> bool {
172            self.read_transactions.as_ref().is_some_and(|txs| txs.remove_active(ptr))
173        }
174
175        /// Returns the number of timed out transactions that were not aborted by the user yet.
176        pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
177            self.read_transactions
178                .as_ref()
179                .map(|read_transactions| read_transactions.timed_out_not_aborted())
180        }
181    }
182
183    #[derive(Debug, Default)]
184    pub(super) struct ReadTransactions {
185        /// Maximum duration that a read transaction can be open until the
186        /// [`ReadTransactions::start_monitor`] aborts it.
187        max_duration: Duration,
188        /// List of currently active read transactions.
189        ///
190        /// We store `usize` instead of a raw pointer as a key, because pointers are not
191        /// comparable. The time of transaction opening is stored as a value.
192        ///
193        /// The backtrace of the transaction opening is recorded only when debug assertions are
194        /// enabled.
195        active: DashMap<usize, (TransactionPtr, Instant, Option<Arc<Backtrace>>)>,
196        /// List of timed out transactions that were not aborted by the user yet, hence have a
197        /// dangling read transaction pointer.
198        timed_out_not_aborted: DashSet<usize>,
199    }
200
201    impl ReadTransactions {
202        pub(super) fn new(max_duration: Duration) -> Self {
203            Self { max_duration, ..Default::default() }
204        }
205
206        /// Adds a new transaction to the list of active read transactions.
207        pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
208            let _ = self.active.insert(
209                ptr as usize,
210                (
211                    tx,
212                    Instant::now(),
213                    cfg!(debug_assertions).then(|| Arc::new(Backtrace::force_capture())),
214                ),
215            );
216        }
217
218        /// Removes a transaction from the list of active read transactions.
219        pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> bool {
220            self.timed_out_not_aborted.remove(&(ptr as usize));
221            self.active.remove(&(ptr as usize)).is_some()
222        }
223
224        /// Returns the number of timed out transactions that were not aborted by the user yet.
225        pub(super) fn timed_out_not_aborted(&self) -> usize {
226            self.timed_out_not_aborted.len()
227        }
228
229        /// Spawns a new [`std::thread`] that monitors the list of active read transactions and
230        /// timeouts those that are open for longer than `ReadTransactions.max_duration`.
231        pub(super) fn start_monitor(self: Arc<Self>) {
232            let task = move || {
233                let mut timed_out_active = Vec::new();
234
235                loop {
236                    let now = Instant::now();
237                    let mut max_active_transaction_duration = None;
238
239                    // Iterate through active read transactions and time out those that's open for
240                    // longer than `self.max_duration`.
241                    for entry in &self.active {
242                        let (tx, start, backtrace) = entry.value();
243                        let duration = now - *start;
244
245                        if duration > self.max_duration {
246                            let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
247                                // Time out the transaction.
248                                //
249                                // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
250                                // prevent MDBX from reusing the pointer of the aborted
251                                // transaction for new read-only transactions. This is
252                                // important because we store the pointer in the `active` list
253                                // and assume that it is unique.
254                                //
255                                // See https://libmdbx.dqdkfa.ru/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
256                                let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
257                                if result.is_ok() {
258                                    tx.set_timed_out();
259                                }
260                                (txn_ptr, duration, result)
261                            });
262
263                            match result {
264                                Ok((txn_ptr, duration, error)) => {
265                                    // Add the transaction to `timed_out_active`. We can't remove it
266                                    // instantly from the list of active transactions, because we
267                                    // iterate through it.
268                                    timed_out_active.push((
269                                        txn_ptr,
270                                        duration,
271                                        backtrace.clone(),
272                                        error,
273                                    ));
274                                }
275                                Err(err) => {
276                                    error!(target: "libmdbx", %err, ?backtrace, "Failed to abort the long-lived read transaction")
277                                }
278                            }
279                        } else {
280                            max_active_transaction_duration = Some(
281                                duration.max(max_active_transaction_duration.unwrap_or_default()),
282                            );
283                        }
284                    }
285
286                    // Walk through timed out transactions, and delete them from the list of active
287                    // transactions.
288                    for (ptr, open_duration, backtrace, err) in timed_out_active.iter().cloned() {
289                        // Try deleting the transaction from the list of active transactions.
290                        let was_in_active = self.remove_active(ptr);
291                        if let Err(err) = err {
292                            if was_in_active {
293                                // If the transaction was in the list of active transactions,
294                                // then user didn't abort it and we failed to do so.
295                                error!(target: "libmdbx", %err, ?open_duration, ?backtrace, "Failed to time out the long-lived read transaction");
296                            }
297                        } else {
298                            // Happy path, the transaction has been timed out by us with no errors.
299                            warn!(target: "libmdbx", ?open_duration, ?backtrace, "Long-lived read transaction has been timed out");
300                            // Add transaction to the list of timed out transactions that were not
301                            // aborted by the user yet.
302                            self.timed_out_not_aborted.insert(ptr as usize);
303                        }
304                    }
305
306                    // Clear the list of timed out transactions, but not de-allocate the reserved
307                    // capacity to save on further pushes.
308                    timed_out_active.clear();
309
310                    if !self.active.is_empty() {
311                        trace!(
312                            target: "libmdbx",
313                            elapsed = ?now.elapsed(),
314                            active = ?self.active.iter().map(|entry| {
315                                let (tx, start, _) = entry.value();
316                                (tx.clone(), start.elapsed())
317                            }).collect::<Vec<_>>(),
318                            "Read transactions"
319                        );
320                    }
321
322                    // Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
323                    // the closest deadline of an active read transaction
324                    let sleep_duration = READ_TRANSACTIONS_CHECK_INTERVAL.min(
325                        self.max_duration - max_active_transaction_duration.unwrap_or_default(),
326                    );
327                    trace!(target: "libmdbx", ?sleep_duration, elapsed = ?now.elapsed(), "Putting transaction monitor to sleep");
328                    std::thread::sleep(sleep_duration);
329                }
330            };
331            std::thread::Builder::new()
332                .name("mdbx-rs-read-tx-timeouts".to_string())
333                .spawn(task)
334                .unwrap();
335        }
336    }
337
338    #[cfg(test)]
339    mod tests {
340        use crate::{
341            txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
342            MaxReadTransactionDuration,
343        };
344        use std::{thread::sleep, time::Duration};
345        use tempfile::tempdir;
346
347        #[test]
348        fn txn_manager_read_transactions_duration_set() {
349            const MAX_DURATION: Duration = Duration::from_secs(1);
350
351            let dir = tempdir().unwrap();
352            let env = Environment::builder()
353                .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
354                .open(dir.path())
355                .unwrap();
356
357            let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
358
359            // Create a read-only transaction, successfully use it, close it by dropping.
360            {
361                let tx = env.begin_ro_txn().unwrap();
362                let tx_ptr = tx.txn() as usize;
363                assert!(read_transactions.active.contains_key(&tx_ptr));
364
365                tx.open_db(None).unwrap();
366                drop(tx);
367
368                assert!(!read_transactions.active.contains_key(&tx_ptr));
369            }
370
371            // Create a read-only transaction, successfully use it, close it by committing.
372            {
373                let tx = env.begin_ro_txn().unwrap();
374                let tx_ptr = tx.txn() as usize;
375                assert!(read_transactions.active.contains_key(&tx_ptr));
376
377                tx.open_db(None).unwrap();
378                tx.commit().unwrap();
379
380                assert!(!read_transactions.active.contains_key(&tx_ptr));
381            }
382
383            {
384                // Create a read-only transaction and observe it's in the list of active
385                // transactions.
386                let tx = env.begin_ro_txn().unwrap();
387                let tx_ptr = tx.txn() as usize;
388                assert!(read_transactions.active.contains_key(&tx_ptr));
389
390                // Wait until the transaction is timed out by the manager.
391                sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
392
393                // Ensure that the transaction is not in the list of active transactions anymore,
394                // and is in the list of timed out but not aborted transactions.
395                assert!(!read_transactions.active.contains_key(&tx_ptr));
396                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
397
398                // Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
399                assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
400                assert!(!read_transactions.active.contains_key(&tx_ptr));
401                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
402
403                assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
404                assert!(!read_transactions.active.contains_key(&tx_ptr));
405                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
406
407                // Ensure that the transaction pointer is not reused when opening a new read-only
408                // transaction.
409                let new_tx = env.begin_ro_txn().unwrap();
410                let new_tx_ptr = new_tx.txn() as usize;
411                assert!(read_transactions.active.contains_key(&new_tx_ptr));
412                assert_ne!(tx_ptr, new_tx_ptr);
413
414                // Drop the transaction and ensure that it's not in the list of timed out but not
415                // aborted transactions anymore.
416                drop(tx);
417                assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
418            }
419        }
420
421        #[test]
422        fn txn_manager_read_transactions_duration_unbounded() {
423            let dir = tempdir().unwrap();
424            let env = Environment::builder()
425                .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
426                .open(dir.path())
427                .unwrap();
428
429            assert!(env.txn_manager().read_transactions.is_none());
430
431            let tx = env.begin_ro_txn().unwrap();
432            sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
433            assert!(tx.commit().is_ok())
434        }
435    }
436}