reth_db/implementation/mdbx/
tx.rs

1//! Transaction wrapper for libmdbx-sys.
2
3use super::{cursor::Cursor, utils::*};
4use crate::{
5    metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6    DatabaseError,
7};
8use reth_db_api::{
9    table::{Compress, DupSort, Encode, Table, TableImporter},
10    transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, trace, warn};
15use std::{
16    backtrace::Backtrace,
17    collections::HashMap,
18    marker::PhantomData,
19    sync::{
20        atomic::{AtomicBool, Ordering},
21        Arc,
22    },
23    time::{Duration, Instant},
24};
25
26/// Duration after which we emit the log about long-lived database transactions.
27const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29/// Wrapper for the libmdbx transaction.
30#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32    /// Libmdbx-sys transaction.
33    pub inner: Transaction<K>,
34
35    /// Cached MDBX DBIs for reuse.
36    dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
37
38    /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
39    /// closed by [`Tx::commit`] or [`Tx::abort`], but we still need to report it in the metrics.
40    ///
41    /// If [Some], then metrics are reported.
42    metrics_handler: Option<MetricsHandler<K>>,
43}
44
45impl<K: TransactionKind> Tx<K> {
46    /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
47    #[inline]
48    #[track_caller]
49    pub(crate) fn new(
50        inner: Transaction<K>,
51        dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
52        env_metrics: Option<Arc<DatabaseEnvMetrics>>,
53    ) -> reth_libmdbx::Result<Self> {
54        let metrics_handler = env_metrics
55            .map(|env_metrics| {
56                let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
57                handler.env_metrics.record_opened_transaction(handler.transaction_mode());
58                handler.log_transaction_opened();
59                Ok(handler)
60            })
61            .transpose()?;
62        Ok(Self { inner, dbis, metrics_handler })
63    }
64
65    /// Gets this transaction ID.
66    pub fn id(&self) -> reth_libmdbx::Result<u64> {
67        self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
68    }
69
70    /// Gets a table database handle if it exists, otherwise creates it.
71    pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
72        if let Some(dbi) = self.dbis.get(T::NAME) {
73            Ok(*dbi)
74        } else {
75            self.inner
76                .open_db(Some(T::NAME))
77                .map(|db| db.dbi())
78                .map_err(|e| DatabaseError::Open(e.into()))
79        }
80    }
81
82    /// Create db Cursor
83    pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
84        let inner = self
85            .inner
86            .cursor_with_dbi(self.get_dbi::<T>()?)
87            .map_err(|e| DatabaseError::InitCursor(e.into()))?;
88
89        Ok(Cursor::new_with_metrics(
90            inner,
91            self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
92        ))
93    }
94
95    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
96    /// record a metric with the provided transaction outcome.
97    ///
98    /// Otherwise, just execute the closure.
99    fn execute_with_close_transaction_metric<R>(
100        mut self,
101        outcome: TransactionOutcome,
102        f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
103    ) -> R {
104        let run = |tx| {
105            let start = Instant::now();
106            let (result, commit_latency) = f(tx);
107            let total_duration = start.elapsed();
108
109            if outcome.is_commit() {
110                debug!(
111                    target: "storage::db::mdbx",
112                    ?total_duration,
113                    ?commit_latency,
114                    is_read_only = K::IS_READ_ONLY,
115                    "Commit"
116                );
117            }
118
119            (result, commit_latency, total_duration)
120        };
121
122        if let Some(mut metrics_handler) = self.metrics_handler.take() {
123            metrics_handler.close_recorded = true;
124            metrics_handler.log_backtrace_on_long_read_transaction();
125
126            let (result, commit_latency, close_duration) = run(self);
127            let open_duration = metrics_handler.start.elapsed();
128            metrics_handler.env_metrics.record_closed_transaction(
129                metrics_handler.transaction_mode(),
130                outcome,
131                open_duration,
132                Some(close_duration),
133                commit_latency,
134            );
135
136            result
137        } else {
138            run(self).0
139        }
140    }
141
142    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
143    /// record a metric with the provided operation.
144    ///
145    /// Otherwise, just execute the closure.
146    fn execute_with_operation_metric<T: Table, R>(
147        &self,
148        operation: Operation,
149        value_size: Option<usize>,
150        f: impl FnOnce(&Transaction<K>) -> R,
151    ) -> R {
152        if let Some(metrics_handler) = &self.metrics_handler {
153            metrics_handler.log_backtrace_on_long_read_transaction();
154            metrics_handler
155                .env_metrics
156                .record_operation(T::NAME, operation, value_size, || f(&self.inner))
157        } else {
158            f(&self.inner)
159        }
160    }
161}
162
163#[derive(Debug)]
164struct MetricsHandler<K: TransactionKind> {
165    /// Cached internal transaction ID provided by libmdbx.
166    txn_id: u64,
167    /// The time when transaction has started.
168    start: Instant,
169    /// Duration after which we emit the log about long-lived database transactions.
170    long_transaction_duration: Duration,
171    /// If `true`, the metric about transaction closing has already been recorded and we don't need
172    /// to do anything on [`Drop::drop`].
173    close_recorded: bool,
174    /// If `true`, the backtrace of transaction will be recorded and logged.
175    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
176    record_backtrace: bool,
177    /// If `true`, the backtrace of transaction has already been recorded and logged.
178    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
179    backtrace_recorded: AtomicBool,
180    /// Shared database environment metrics.
181    env_metrics: Arc<DatabaseEnvMetrics>,
182    /// Backtrace of the location where the transaction has been opened. Reported only with debug
183    /// assertions, because capturing the backtrace on every transaction opening is expensive.
184    #[cfg(debug_assertions)]
185    open_backtrace: Backtrace,
186    _marker: PhantomData<K>,
187}
188
189impl<K: TransactionKind> MetricsHandler<K> {
190    fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
191        Self {
192            txn_id,
193            start: Instant::now(),
194            long_transaction_duration: LONG_TRANSACTION_DURATION,
195            close_recorded: false,
196            record_backtrace: true,
197            backtrace_recorded: AtomicBool::new(false),
198            #[cfg(debug_assertions)]
199            open_backtrace: Backtrace::force_capture(),
200            env_metrics,
201            _marker: PhantomData,
202        }
203    }
204
205    const fn transaction_mode(&self) -> TransactionMode {
206        if K::IS_READ_ONLY {
207            TransactionMode::ReadOnly
208        } else {
209            TransactionMode::ReadWrite
210        }
211    }
212
213    /// Logs the caller location and ID of the transaction that was opened.
214    #[track_caller]
215    fn log_transaction_opened(&self) {
216        trace!(
217            target: "storage::db::mdbx",
218            caller = %core::panic::Location::caller(),
219            id = %self.txn_id,
220            mode = %self.transaction_mode().as_str(),
221            "Transaction opened",
222        );
223    }
224
225    /// Logs the backtrace of current call if the duration that the read transaction has been open
226    /// is more than [`LONG_TRANSACTION_DURATION`] and `record_backtrace == true`.
227    /// The backtrace is recorded and logged just once, guaranteed by `backtrace_recorded` atomic.
228    ///
229    /// NOTE: Backtrace is recorded using [`Backtrace::force_capture`], so `RUST_BACKTRACE` env var
230    /// is not needed.
231    fn log_backtrace_on_long_read_transaction(&self) {
232        if self.record_backtrace &&
233            !self.backtrace_recorded.load(Ordering::Relaxed) &&
234            self.transaction_mode().is_read_only()
235        {
236            let open_duration = self.start.elapsed();
237            if open_duration >= self.long_transaction_duration {
238                self.backtrace_recorded.store(true, Ordering::Relaxed);
239                #[cfg(debug_assertions)]
240                let message = format!(
241                    "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
242                    self.open_backtrace,
243                    Backtrace::force_capture()
244                );
245                #[cfg(not(debug_assertions))]
246                let message = format!(
247                    "The database read transaction has been open for too long. Backtrace:\n{}",
248                    Backtrace::force_capture()
249                );
250                warn!(
251                    target: "storage::db::mdbx",
252                    ?open_duration,
253                    %self.txn_id,
254                    "{message}"
255                );
256            }
257        }
258    }
259}
260
261impl<K: TransactionKind> Drop for MetricsHandler<K> {
262    fn drop(&mut self) {
263        if !self.close_recorded {
264            self.log_backtrace_on_long_read_transaction();
265            self.env_metrics.record_closed_transaction(
266                self.transaction_mode(),
267                TransactionOutcome::Drop,
268                self.start.elapsed(),
269                None,
270                None,
271            );
272        }
273    }
274}
275
276impl TableImporter for Tx<RW> {}
277
278impl<K: TransactionKind> DbTx for Tx<K> {
279    type Cursor<T: Table> = Cursor<K, T>;
280    type DupCursor<T: DupSort> = Cursor<K, T>;
281
282    fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
283        self.get_by_encoded_key::<T>(&key.encode())
284    }
285
286    fn get_by_encoded_key<T: Table>(
287        &self,
288        key: &<T::Key as Encode>::Encoded,
289    ) -> Result<Option<T::Value>, DatabaseError> {
290        self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
291            tx.get(self.get_dbi::<T>()?, key.as_ref())
292                .map_err(|e| DatabaseError::Read(e.into()))?
293                .map(decode_one::<T>)
294                .transpose()
295        })
296    }
297
298    fn commit(self) -> Result<bool, DatabaseError> {
299        self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
300            match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
301                Ok((v, latency)) => (Ok(v), Some(latency)),
302                Err(e) => (Err(e), None),
303            }
304        })
305    }
306
307    fn abort(self) {
308        self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
309            (drop(this.inner), None)
310        })
311    }
312
313    // Iterate over read only values in database.
314    fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
315        self.new_cursor()
316    }
317
318    /// Iterate over read only values in database.
319    fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
320        self.new_cursor()
321    }
322
323    /// Returns number of entries in the table using cheap DB stats invocation.
324    fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
325        Ok(self
326            .inner
327            .db_stat_with_dbi(self.get_dbi::<T>()?)
328            .map_err(|e| DatabaseError::Stats(e.into()))?
329            .entries())
330    }
331
332    /// Disables long-lived read transaction safety guarantees, such as backtrace recording and
333    /// timeout.
334    fn disable_long_read_transaction_safety(&mut self) {
335        if let Some(metrics_handler) = self.metrics_handler.as_mut() {
336            metrics_handler.record_backtrace = false;
337        }
338
339        self.inner.disable_timeout();
340    }
341}
342
343impl DbTxMut for Tx<RW> {
344    type CursorMut<T: Table> = Cursor<RW, T>;
345    type DupCursorMut<T: DupSort> = Cursor<RW, T>;
346
347    fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
348        let key = key.encode();
349        let value = value.compress();
350        self.execute_with_operation_metric::<T, _>(
351            Operation::Put,
352            Some(value.as_ref().len()),
353            |tx| {
354                tx.put(self.get_dbi::<T>()?, key.as_ref(), value, WriteFlags::UPSERT).map_err(|e| {
355                    DatabaseWriteError {
356                        info: e.into(),
357                        operation: DatabaseWriteOperation::Put,
358                        table_name: T::NAME,
359                        key: key.into(),
360                    }
361                    .into()
362                })
363            },
364        )
365    }
366
367    fn delete<T: Table>(
368        &self,
369        key: T::Key,
370        value: Option<T::Value>,
371    ) -> Result<bool, DatabaseError> {
372        let mut data = None;
373
374        let value = value.map(Compress::compress);
375        if let Some(value) = &value {
376            data = Some(value.as_ref());
377        };
378
379        self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
380            tx.del(self.get_dbi::<T>()?, key.encode(), data)
381                .map_err(|e| DatabaseError::Delete(e.into()))
382        })
383    }
384
385    fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
386        self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
387
388        Ok(())
389    }
390
391    fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
392        self.new_cursor()
393    }
394
395    fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
396        self.new_cursor()
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
403    use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
404    use reth_libmdbx::MaxReadTransactionDuration;
405    use reth_storage_errors::db::DatabaseError;
406    use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
407    use tempfile::tempdir;
408
409    #[test]
410    fn long_read_transaction_safety_disabled() {
411        const MAX_DURATION: Duration = Duration::from_secs(1);
412
413        let dir = tempdir().unwrap();
414        let args = DatabaseArguments::new(ClientVersion::default())
415            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
416                MAX_DURATION,
417            )));
418        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
419
420        let mut tx = db.tx().unwrap();
421        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
422        tx.disable_long_read_transaction_safety();
423        // Give the `TxnManager` some time to time out the transaction.
424        sleep(MAX_DURATION + Duration::from_millis(100));
425
426        // Transaction has not timed out.
427        assert_eq!(
428            tx.get::<tables::Transactions>(0),
429            Err(DatabaseError::Open(reth_libmdbx::Error::NotFound.into()))
430        );
431        // Backtrace is not recorded.
432        assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
433    }
434
435    #[test]
436    fn long_read_transaction_safety_enabled() {
437        const MAX_DURATION: Duration = Duration::from_secs(1);
438
439        let dir = tempdir().unwrap();
440        let args = DatabaseArguments::new(ClientVersion::default())
441            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
442                MAX_DURATION,
443            )));
444        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
445
446        let mut tx = db.tx().unwrap();
447        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
448        // Give the `TxnManager` some time to time out the transaction.
449        sleep(MAX_DURATION + Duration::from_millis(100));
450
451        // Transaction has timed out.
452        assert_eq!(
453            tx.get::<tables::Transactions>(0),
454            Err(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
455        );
456        // Backtrace is recorded.
457        assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
458    }
459}