reth_db/implementation/mdbx/
tx.rs

1//! Transaction wrapper for libmdbx-sys.
2
3use super::{cursor::Cursor, utils::*};
4use crate::{
5    metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6    DatabaseError,
7};
8use reth_db_api::{
9    table::{Compress, DupSort, Encode, Table, TableImporter},
10    transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, trace, warn};
15use std::{
16    backtrace::Backtrace,
17    marker::PhantomData,
18    sync::{
19        atomic::{AtomicBool, Ordering},
20        Arc,
21    },
22    time::{Duration, Instant},
23};
24
25/// Duration after which we emit the log about long-lived database transactions.
26const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
27
28/// Wrapper for the libmdbx transaction.
29#[derive(Debug)]
30pub struct Tx<K: TransactionKind> {
31    /// Libmdbx-sys transaction.
32    pub inner: Transaction<K>,
33
34    /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
35    /// closed by [`Tx::commit`] or [`Tx::abort`], but we still need to report it in the metrics.
36    ///
37    /// If [Some], then metrics are reported.
38    metrics_handler: Option<MetricsHandler<K>>,
39}
40
41impl<K: TransactionKind> Tx<K> {
42    /// Creates new `Tx` object with a `RO` or `RW` transaction.
43    #[inline]
44    pub const fn new(inner: Transaction<K>) -> Self {
45        Self::new_inner(inner, None)
46    }
47
48    /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
49    #[inline]
50    #[track_caller]
51    pub(crate) fn new_with_metrics(
52        inner: Transaction<K>,
53        env_metrics: Option<Arc<DatabaseEnvMetrics>>,
54    ) -> reth_libmdbx::Result<Self> {
55        let metrics_handler = env_metrics
56            .map(|env_metrics| {
57                let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
58                handler.env_metrics.record_opened_transaction(handler.transaction_mode());
59                handler.log_transaction_opened();
60                Ok(handler)
61            })
62            .transpose()?;
63        Ok(Self::new_inner(inner, metrics_handler))
64    }
65
66    #[inline]
67    const fn new_inner(inner: Transaction<K>, metrics_handler: Option<MetricsHandler<K>>) -> Self {
68        Self { inner, metrics_handler }
69    }
70
71    /// Gets this transaction ID.
72    pub fn id(&self) -> reth_libmdbx::Result<u64> {
73        self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
74    }
75
76    /// Gets a table database handle if it exists, otherwise creates it.
77    pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
78        self.inner
79            .open_db(Some(T::NAME))
80            .map(|db| db.dbi())
81            .map_err(|e| DatabaseError::Open(e.into()))
82    }
83
84    /// Create db Cursor
85    pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
86        let inner = self
87            .inner
88            .cursor_with_dbi(self.get_dbi::<T>()?)
89            .map_err(|e| DatabaseError::InitCursor(e.into()))?;
90
91        Ok(Cursor::new_with_metrics(
92            inner,
93            self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
94        ))
95    }
96
97    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
98    /// record a metric with the provided transaction outcome.
99    ///
100    /// Otherwise, just execute the closure.
101    fn execute_with_close_transaction_metric<R>(
102        mut self,
103        outcome: TransactionOutcome,
104        f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
105    ) -> R {
106        let run = |tx| {
107            let start = Instant::now();
108            let (result, commit_latency) = f(tx);
109            let total_duration = start.elapsed();
110
111            if outcome.is_commit() {
112                debug!(
113                    target: "storage::db::mdbx",
114                    ?total_duration,
115                    ?commit_latency,
116                    is_read_only = K::IS_READ_ONLY,
117                    "Commit"
118                );
119            }
120
121            (result, commit_latency, total_duration)
122        };
123
124        if let Some(mut metrics_handler) = self.metrics_handler.take() {
125            metrics_handler.close_recorded = true;
126            metrics_handler.log_backtrace_on_long_read_transaction();
127
128            let (result, commit_latency, close_duration) = run(self);
129            let open_duration = metrics_handler.start.elapsed();
130            metrics_handler.env_metrics.record_closed_transaction(
131                metrics_handler.transaction_mode(),
132                outcome,
133                open_duration,
134                Some(close_duration),
135                commit_latency,
136            );
137
138            result
139        } else {
140            run(self).0
141        }
142    }
143
144    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
145    /// record a metric with the provided operation.
146    ///
147    /// Otherwise, just execute the closure.
148    fn execute_with_operation_metric<T: Table, R>(
149        &self,
150        operation: Operation,
151        value_size: Option<usize>,
152        f: impl FnOnce(&Transaction<K>) -> R,
153    ) -> R {
154        if let Some(metrics_handler) = &self.metrics_handler {
155            metrics_handler.log_backtrace_on_long_read_transaction();
156            metrics_handler
157                .env_metrics
158                .record_operation(T::NAME, operation, value_size, || f(&self.inner))
159        } else {
160            f(&self.inner)
161        }
162    }
163}
164
165#[derive(Debug)]
166struct MetricsHandler<K: TransactionKind> {
167    /// Cached internal transaction ID provided by libmdbx.
168    txn_id: u64,
169    /// The time when transaction has started.
170    start: Instant,
171    /// Duration after which we emit the log about long-lived database transactions.
172    long_transaction_duration: Duration,
173    /// If `true`, the metric about transaction closing has already been recorded and we don't need
174    /// to do anything on [`Drop::drop`].
175    close_recorded: bool,
176    /// If `true`, the backtrace of transaction will be recorded and logged.
177    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
178    record_backtrace: bool,
179    /// If `true`, the backtrace of transaction has already been recorded and logged.
180    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
181    backtrace_recorded: AtomicBool,
182    /// Shared database environment metrics.
183    env_metrics: Arc<DatabaseEnvMetrics>,
184    /// Backtrace of the location where the transaction has been opened. Reported only with debug
185    /// assertions, because capturing the backtrace on every transaction opening is expensive.
186    #[cfg(debug_assertions)]
187    open_backtrace: Backtrace,
188    _marker: PhantomData<K>,
189}
190
191impl<K: TransactionKind> MetricsHandler<K> {
192    fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
193        Self {
194            txn_id,
195            start: Instant::now(),
196            long_transaction_duration: LONG_TRANSACTION_DURATION,
197            close_recorded: false,
198            record_backtrace: true,
199            backtrace_recorded: AtomicBool::new(false),
200            #[cfg(debug_assertions)]
201            open_backtrace: Backtrace::force_capture(),
202            env_metrics,
203            _marker: PhantomData,
204        }
205    }
206
207    const fn transaction_mode(&self) -> TransactionMode {
208        if K::IS_READ_ONLY {
209            TransactionMode::ReadOnly
210        } else {
211            TransactionMode::ReadWrite
212        }
213    }
214
215    /// Logs the caller location and ID of the transaction that was opened.
216    #[track_caller]
217    fn log_transaction_opened(&self) {
218        trace!(
219            target: "storage::db::mdbx",
220            caller = %core::panic::Location::caller(),
221            id = %self.txn_id,
222            mode = %self.transaction_mode().as_str(),
223            "Transaction opened",
224        );
225    }
226
227    /// Logs the backtrace of current call if the duration that the read transaction has been open
228    /// is more than [`LONG_TRANSACTION_DURATION`] and `record_backtrace == true`.
229    /// The backtrace is recorded and logged just once, guaranteed by `backtrace_recorded` atomic.
230    ///
231    /// NOTE: Backtrace is recorded using [`Backtrace::force_capture`], so `RUST_BACKTRACE` env var
232    /// is not needed.
233    fn log_backtrace_on_long_read_transaction(&self) {
234        if self.record_backtrace &&
235            !self.backtrace_recorded.load(Ordering::Relaxed) &&
236            self.transaction_mode().is_read_only()
237        {
238            let open_duration = self.start.elapsed();
239            if open_duration >= self.long_transaction_duration {
240                self.backtrace_recorded.store(true, Ordering::Relaxed);
241                #[cfg(debug_assertions)]
242                let message = format!(
243                   "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
244                   self.open_backtrace,
245                   Backtrace::force_capture()
246                );
247                #[cfg(not(debug_assertions))]
248                let message = format!(
249                    "The database read transaction has been open for too long. Backtrace:\n{}",
250                    Backtrace::force_capture()
251                );
252                warn!(
253                    target: "storage::db::mdbx",
254                    ?open_duration,
255                    %self.txn_id,
256                    "{message}"
257                );
258            }
259        }
260    }
261}
262
263impl<K: TransactionKind> Drop for MetricsHandler<K> {
264    fn drop(&mut self) {
265        if !self.close_recorded {
266            self.log_backtrace_on_long_read_transaction();
267            self.env_metrics.record_closed_transaction(
268                self.transaction_mode(),
269                TransactionOutcome::Drop,
270                self.start.elapsed(),
271                None,
272                None,
273            );
274        }
275    }
276}
277
278impl TableImporter for Tx<RW> {}
279
280impl<K: TransactionKind> DbTx for Tx<K> {
281    type Cursor<T: Table> = Cursor<K, T>;
282    type DupCursor<T: DupSort> = Cursor<K, T>;
283
284    fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
285        self.get_by_encoded_key::<T>(&key.encode())
286    }
287
288    fn get_by_encoded_key<T: Table>(
289        &self,
290        key: &<T::Key as Encode>::Encoded,
291    ) -> Result<Option<T::Value>, DatabaseError> {
292        self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
293            tx.get(self.get_dbi::<T>()?, key.as_ref())
294                .map_err(|e| DatabaseError::Read(e.into()))?
295                .map(decode_one::<T>)
296                .transpose()
297        })
298    }
299
300    fn commit(self) -> Result<bool, DatabaseError> {
301        self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
302            match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
303                Ok((v, latency)) => (Ok(v), Some(latency)),
304                Err(e) => (Err(e), None),
305            }
306        })
307    }
308
309    fn abort(self) {
310        self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
311            (drop(this.inner), None)
312        })
313    }
314
315    // Iterate over read only values in database.
316    fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
317        self.new_cursor()
318    }
319
320    /// Iterate over read only values in database.
321    fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
322        self.new_cursor()
323    }
324
325    /// Returns number of entries in the table using cheap DB stats invocation.
326    fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
327        Ok(self
328            .inner
329            .db_stat_with_dbi(self.get_dbi::<T>()?)
330            .map_err(|e| DatabaseError::Stats(e.into()))?
331            .entries())
332    }
333
334    /// Disables long-lived read transaction safety guarantees, such as backtrace recording and
335    /// timeout.
336    fn disable_long_read_transaction_safety(&mut self) {
337        if let Some(metrics_handler) = self.metrics_handler.as_mut() {
338            metrics_handler.record_backtrace = false;
339        }
340
341        self.inner.disable_timeout();
342    }
343}
344
345impl DbTxMut for Tx<RW> {
346    type CursorMut<T: Table> = Cursor<RW, T>;
347    type DupCursorMut<T: DupSort> = Cursor<RW, T>;
348
349    fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
350        let key = key.encode();
351        let value = value.compress();
352        self.execute_with_operation_metric::<T, _>(
353            Operation::Put,
354            Some(value.as_ref().len()),
355            |tx| {
356                tx.put(self.get_dbi::<T>()?, key.as_ref(), value, WriteFlags::UPSERT).map_err(|e| {
357                    DatabaseWriteError {
358                        info: e.into(),
359                        operation: DatabaseWriteOperation::Put,
360                        table_name: T::NAME,
361                        key: key.into(),
362                    }
363                    .into()
364                })
365            },
366        )
367    }
368
369    fn delete<T: Table>(
370        &self,
371        key: T::Key,
372        value: Option<T::Value>,
373    ) -> Result<bool, DatabaseError> {
374        let mut data = None;
375
376        let value = value.map(Compress::compress);
377        if let Some(value) = &value {
378            data = Some(value.as_ref());
379        };
380
381        self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
382            tx.del(self.get_dbi::<T>()?, key.encode(), data)
383                .map_err(|e| DatabaseError::Delete(e.into()))
384        })
385    }
386
387    fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
388        self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
389
390        Ok(())
391    }
392
393    fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
394        self.new_cursor()
395    }
396
397    fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
398        self.new_cursor()
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
405    use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
406    use reth_libmdbx::MaxReadTransactionDuration;
407    use reth_storage_errors::db::DatabaseError;
408    use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
409    use tempfile::tempdir;
410
411    #[test]
412    fn long_read_transaction_safety_disabled() {
413        const MAX_DURATION: Duration = Duration::from_secs(1);
414
415        let dir = tempdir().unwrap();
416        let args = DatabaseArguments::new(ClientVersion::default())
417            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
418                MAX_DURATION,
419            )));
420        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
421
422        let mut tx = db.tx().unwrap();
423        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
424        tx.disable_long_read_transaction_safety();
425        // Give the `TxnManager` some time to time out the transaction.
426        sleep(MAX_DURATION + Duration::from_millis(100));
427
428        // Transaction has not timed out.
429        assert_eq!(
430            tx.get::<tables::Transactions>(0),
431            Err(DatabaseError::Open(reth_libmdbx::Error::NotFound.into()))
432        );
433        // Backtrace is not recorded.
434        assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
435    }
436
437    #[test]
438    fn long_read_transaction_safety_enabled() {
439        const MAX_DURATION: Duration = Duration::from_secs(1);
440
441        let dir = tempdir().unwrap();
442        let args = DatabaseArguments::new(ClientVersion::default())
443            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
444                MAX_DURATION,
445            )));
446        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
447
448        let mut tx = db.tx().unwrap();
449        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
450        // Give the `TxnManager` some time to time out the transaction.
451        sleep(MAX_DURATION + Duration::from_millis(100));
452
453        // Transaction has timed out.
454        assert_eq!(
455            tx.get::<tables::Transactions>(0),
456            Err(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
457        );
458        // Backtrace is recorded.
459        assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
460    }
461}