reth_libmdbx/txn_manager.rs
1use crate::{
2 environment::EnvPtr,
3 error::{mdbx_result, Result},
4 CommitLatency,
5};
6use std::{
7 ptr,
8 sync::mpsc::{sync_channel, Receiver, SyncSender},
9};
10
11#[derive(Copy, Clone, Debug)]
12pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
13unsafe impl Send for TxnPtr {}
14unsafe impl Sync for TxnPtr {}
15
16pub(crate) enum TxnManagerMessage {
17 Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
18 Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
19 Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
20}
21
22/// Manages transactions by doing two things:
23/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
24/// corresponding [`TxnManagerMessage`]
25/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
26/// `TxnManager::with_max_read_transaction_duration` is called)
27#[derive(Debug)]
28pub(crate) struct TxnManager {
29 sender: SyncSender<TxnManagerMessage>,
30 #[cfg(feature = "read-tx-timeouts")]
31 read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
32}
33
34impl TxnManager {
35 pub(crate) fn new(env: EnvPtr) -> Self {
36 let (tx, rx) = sync_channel(0);
37 let txn_manager = Self {
38 sender: tx,
39 #[cfg(feature = "read-tx-timeouts")]
40 read_transactions: None,
41 };
42
43 txn_manager.start_message_listener(env, rx);
44
45 txn_manager
46 }
47
48 /// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
49 /// executes an FFI function, and returns the result on the provided channel.
50 ///
51 /// - [`TxnManagerMessage::Begin`] opens a new transaction with [`ffi::mdbx_txn_begin_ex`]
52 /// - [`TxnManagerMessage::Abort`] aborts a transaction with [`ffi::mdbx_txn_abort`]
53 /// - [`TxnManagerMessage::Commit`] commits a transaction with [`ffi::mdbx_txn_commit_ex`]
54 fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
55 let task = move || {
56 let env = env;
57 loop {
58 match rx.recv() {
59 Ok(msg) => match msg {
60 TxnManagerMessage::Begin { parent, flags, sender } => {
61 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
62 let res = mdbx_result(unsafe {
63 ffi::mdbx_txn_begin_ex(
64 env.0,
65 parent.0,
66 flags,
67 &mut txn,
68 ptr::null_mut(),
69 )
70 })
71 .map(|_| TxnPtr(txn));
72 sender.send(res).unwrap();
73 }
74 TxnManagerMessage::Abort { tx, sender } => {
75 sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
76 }
77 TxnManagerMessage::Commit { tx, sender } => {
78 sender
79 .send({
80 let mut latency = CommitLatency::new();
81 mdbx_result(unsafe {
82 ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
83 })
84 .map(|v| (v, latency))
85 })
86 .unwrap();
87 }
88 },
89 Err(_) => return,
90 }
91 }
92 };
93 std::thread::Builder::new().name("mdbx-rs-txn-manager".to_string()).spawn(task).unwrap();
94 }
95
96 pub(crate) fn send_message(&self, message: TxnManagerMessage) {
97 self.sender.send(message).unwrap()
98 }
99}
100
101#[cfg(feature = "read-tx-timeouts")]
102mod read_transactions {
103 use crate::{
104 environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
105 txn_manager::TxnManager,
106 };
107 use dashmap::{DashMap, DashSet};
108 use std::{
109 backtrace::Backtrace,
110 sync::{mpsc::sync_channel, Arc},
111 time::{Duration, Instant},
112 };
113 use tracing::{error, trace, warn};
114
115 const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
116
117 impl TxnManager {
118 /// Returns a new instance for which the maximum duration that a read transaction can be
119 /// open is set.
120 pub(crate) fn new_with_max_read_transaction_duration(
121 env: EnvPtr,
122 duration: Duration,
123 ) -> Self {
124 let read_transactions = Arc::new(ReadTransactions::new(duration));
125 read_transactions.clone().start_monitor();
126
127 let (tx, rx) = sync_channel(0);
128
129 let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
130
131 txn_manager.start_message_listener(env, rx);
132
133 txn_manager
134 }
135
136 /// Adds a new transaction to the list of active read transactions.
137 pub(crate) fn add_active_read_transaction(
138 &self,
139 ptr: *mut ffi::MDBX_txn,
140 tx: TransactionPtr,
141 ) {
142 if let Some(read_transactions) = &self.read_transactions {
143 read_transactions.add_active(ptr, tx);
144 }
145 }
146
147 /// Removes a transaction from the list of active read transactions.
148 ///
149 /// Returns `true` if the transaction was found and removed.
150 pub(crate) fn remove_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) -> bool {
151 self.read_transactions.as_ref().is_some_and(|txs| txs.remove_active(ptr))
152 }
153
154 /// Returns the number of timed out transactions that were not aborted by the user yet.
155 pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
156 self.read_transactions
157 .as_ref()
158 .map(|read_transactions| read_transactions.timed_out_not_aborted())
159 }
160 }
161
162 #[derive(Debug, Default)]
163 pub(super) struct ReadTransactions {
164 /// Maximum duration that a read transaction can be open until the
165 /// [`ReadTransactions::start_monitor`] aborts it.
166 max_duration: Duration,
167 /// List of currently active read transactions.
168 ///
169 /// We store `usize` instead of a raw pointer as a key, because pointers are not
170 /// comparable. The time of transaction opening is stored as a value.
171 ///
172 /// The backtrace of the transaction opening is recorded only when debug assertions are
173 /// enabled.
174 active: DashMap<usize, (TransactionPtr, Instant, Option<Arc<Backtrace>>)>,
175 /// List of timed out transactions that were not aborted by the user yet, hence have a
176 /// dangling read transaction pointer.
177 timed_out_not_aborted: DashSet<usize>,
178 }
179
180 impl ReadTransactions {
181 pub(super) fn new(max_duration: Duration) -> Self {
182 Self { max_duration, ..Default::default() }
183 }
184
185 /// Adds a new transaction to the list of active read transactions.
186 pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
187 let _ = self.active.insert(
188 ptr as usize,
189 (
190 tx,
191 Instant::now(),
192 cfg!(debug_assertions).then(|| Arc::new(Backtrace::force_capture())),
193 ),
194 );
195 }
196
197 /// Removes a transaction from the list of active read transactions.
198 pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> bool {
199 self.timed_out_not_aborted.remove(&(ptr as usize));
200 self.active.remove(&(ptr as usize)).is_some()
201 }
202
203 /// Returns the number of timed out transactions that were not aborted by the user yet.
204 pub(super) fn timed_out_not_aborted(&self) -> usize {
205 self.timed_out_not_aborted.len()
206 }
207
208 /// Spawns a new [`std::thread`] that monitors the list of active read transactions and
209 /// timeouts those that are open for longer than `ReadTransactions.max_duration`.
210 pub(super) fn start_monitor(self: Arc<Self>) {
211 let task = move || {
212 let mut timed_out_active = Vec::new();
213
214 loop {
215 let now = Instant::now();
216 let mut max_active_transaction_duration = None;
217
218 // Iterate through active read transactions and time out those that's open for
219 // longer than `self.max_duration`.
220 for entry in &self.active {
221 let (tx, start, backtrace) = entry.value();
222 let duration = now - *start;
223
224 if duration > self.max_duration {
225 let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
226 // Time out the transaction.
227 //
228 // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
229 // prevent MDBX from reusing the pointer of the aborted
230 // transaction for new read-only transactions. This is
231 // important because we store the pointer in the `active` list
232 // and assume that it is unique.
233 //
234 // See https://libmdbx.dqdkfa.ru/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
235 let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
236 if result.is_ok() {
237 tx.set_timed_out();
238 }
239 (txn_ptr, duration, result)
240 });
241
242 match result {
243 Ok((txn_ptr, duration, error)) => {
244 // Add the transaction to `timed_out_active`. We can't remove it
245 // instantly from the list of active transactions, because we
246 // iterate through it.
247 timed_out_active.push((
248 txn_ptr,
249 duration,
250 backtrace.clone(),
251 error,
252 ));
253 }
254 Err(err) => {
255 error!(target: "libmdbx", %err, ?backtrace, "Failed to abort the long-lived read transaction")
256 }
257 }
258 } else {
259 max_active_transaction_duration = Some(
260 duration.max(max_active_transaction_duration.unwrap_or_default()),
261 );
262 }
263 }
264
265 // Walk through timed out transactions, and delete them from the list of active
266 // transactions.
267 for (ptr, open_duration, backtrace, err) in timed_out_active.iter().cloned() {
268 // Try deleting the transaction from the list of active transactions.
269 let was_in_active = self.remove_active(ptr);
270 if let Err(err) = err {
271 if was_in_active {
272 // If the transaction was in the list of active transactions,
273 // then user didn't abort it and we failed to do so.
274 error!(target: "libmdbx", %err, ?open_duration, ?backtrace, "Failed to time out the long-lived read transaction");
275 }
276 } else {
277 // Happy path, the transaction has been timed out by us with no errors.
278 warn!(target: "libmdbx", ?open_duration, ?backtrace, "Long-lived read transaction has been timed out");
279 // Add transaction to the list of timed out transactions that were not
280 // aborted by the user yet.
281 self.timed_out_not_aborted.insert(ptr as usize);
282 }
283 }
284
285 // Clear the list of timed out transactions, but not de-allocate the reserved
286 // capacity to save on further pushes.
287 timed_out_active.clear();
288
289 if !self.active.is_empty() {
290 trace!(
291 target: "libmdbx",
292 elapsed = ?now.elapsed(),
293 active = ?self.active.iter().map(|entry| {
294 let (tx, start, _) = entry.value();
295 (tx.clone(), start.elapsed())
296 }).collect::<Vec<_>>(),
297 "Read transactions"
298 );
299 }
300
301 // Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
302 // the closest deadline of an active read transaction
303 let sleep_duration = READ_TRANSACTIONS_CHECK_INTERVAL.min(
304 self.max_duration - max_active_transaction_duration.unwrap_or_default(),
305 );
306 trace!(target: "libmdbx", ?sleep_duration, elapsed = ?now.elapsed(), "Putting transaction monitor to sleep");
307 std::thread::sleep(sleep_duration);
308 }
309 };
310 std::thread::Builder::new()
311 .name("mdbx-rs-read-tx-timeouts".to_string())
312 .spawn(task)
313 .unwrap();
314 }
315 }
316
317 #[cfg(test)]
318 mod tests {
319 use crate::{
320 txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
321 MaxReadTransactionDuration,
322 };
323 use std::{thread::sleep, time::Duration};
324 use tempfile::tempdir;
325
326 #[test]
327 fn txn_manager_read_transactions_duration_set() {
328 const MAX_DURATION: Duration = Duration::from_secs(1);
329
330 let dir = tempdir().unwrap();
331 let env = Environment::builder()
332 .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
333 .open(dir.path())
334 .unwrap();
335
336 let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
337
338 // Create a read-only transaction, successfully use it, close it by dropping.
339 {
340 let tx = env.begin_ro_txn().unwrap();
341 let tx_ptr = tx.txn() as usize;
342 assert!(read_transactions.active.contains_key(&tx_ptr));
343
344 tx.open_db(None).unwrap();
345 drop(tx);
346
347 assert!(!read_transactions.active.contains_key(&tx_ptr));
348 }
349
350 // Create a read-only transaction, successfully use it, close it by committing.
351 {
352 let tx = env.begin_ro_txn().unwrap();
353 let tx_ptr = tx.txn() as usize;
354 assert!(read_transactions.active.contains_key(&tx_ptr));
355
356 tx.open_db(None).unwrap();
357 tx.commit().unwrap();
358
359 assert!(!read_transactions.active.contains_key(&tx_ptr));
360 }
361
362 {
363 // Create a read-only transaction and observe it's in the list of active
364 // transactions.
365 let tx = env.begin_ro_txn().unwrap();
366 let tx_ptr = tx.txn() as usize;
367 assert!(read_transactions.active.contains_key(&tx_ptr));
368
369 // Wait until the transaction is timed out by the manager.
370 sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
371
372 // Ensure that the transaction is not in the list of active transactions anymore,
373 // and is in the list of timed out but not aborted transactions.
374 assert!(!read_transactions.active.contains_key(&tx_ptr));
375 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
376
377 // Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
378 assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
379 assert!(!read_transactions.active.contains_key(&tx_ptr));
380 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
381
382 assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
383 assert!(!read_transactions.active.contains_key(&tx_ptr));
384 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
385
386 // Ensure that the transaction pointer is not reused when opening a new read-only
387 // transaction.
388 let new_tx = env.begin_ro_txn().unwrap();
389 let new_tx_ptr = new_tx.txn() as usize;
390 assert!(read_transactions.active.contains_key(&new_tx_ptr));
391 assert_ne!(tx_ptr, new_tx_ptr);
392
393 // Drop the transaction and ensure that it's not in the list of timed out but not
394 // aborted transactions anymore.
395 drop(tx);
396 assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
397 }
398 }
399
400 #[test]
401 fn txn_manager_read_transactions_duration_unbounded() {
402 let dir = tempdir().unwrap();
403 let env = Environment::builder()
404 .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
405 .open(dir.path())
406 .unwrap();
407
408 assert!(env.txn_manager().read_transactions.is_none());
409
410 let tx = env.begin_ro_txn().unwrap();
411 sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
412 assert!(tx.commit().is_ok())
413 }
414 }
415}