reth_db/implementation/mdbx/
tx.rs

1//! Transaction wrapper for libmdbx-sys.
2
3use super::{cursor::Cursor, utils::*};
4use crate::{
5    metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6    DatabaseError,
7};
8use reth_db_api::{
9    table::{Compress, DupSort, Encode, IntoVec, Table, TableImporter},
10    transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, trace, warn};
15use std::{
16    backtrace::Backtrace,
17    collections::HashMap,
18    marker::PhantomData,
19    sync::{
20        atomic::{AtomicBool, Ordering},
21        Arc,
22    },
23    time::{Duration, Instant},
24};
25
26/// Duration after which we emit the log about long-lived database transactions.
27const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29/// Wrapper for the libmdbx transaction.
30#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32    /// Libmdbx-sys transaction.
33    pub inner: Transaction<K>,
34
35    /// Cached MDBX DBIs for reuse.
36    dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
37
38    /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
39    /// closed by [`Tx::commit`] or [`Tx::abort`], but we still need to report it in the metrics.
40    ///
41    /// If [Some], then metrics are reported.
42    metrics_handler: Option<MetricsHandler<K>>,
43}
44
45impl<K: TransactionKind> Tx<K> {
46    /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
47    #[inline]
48    #[track_caller]
49    pub(crate) fn new(
50        inner: Transaction<K>,
51        dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
52        env_metrics: Option<Arc<DatabaseEnvMetrics>>,
53    ) -> reth_libmdbx::Result<Self> {
54        let metrics_handler = env_metrics
55            .map(|env_metrics| {
56                let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
57                handler.env_metrics.record_opened_transaction(handler.transaction_mode());
58                handler.log_transaction_opened();
59                Ok(handler)
60            })
61            .transpose()?;
62        Ok(Self { inner, dbis, metrics_handler })
63    }
64
65    /// Gets this transaction ID.
66    pub fn id(&self) -> reth_libmdbx::Result<u64> {
67        self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
68    }
69
70    /// Gets a table database handle by name if it exists, otherwise, check the
71    /// database, opening the DB if it exists.
72    pub fn get_dbi_raw(&self, name: &str) -> Result<MDBX_dbi, DatabaseError> {
73        if let Some(dbi) = self.dbis.get(name) {
74            Ok(*dbi)
75        } else {
76            self.inner
77                .open_db(Some(name))
78                .map(|db| db.dbi())
79                .map_err(|e| DatabaseError::Open(e.into()))
80        }
81    }
82
83    /// Gets a table database handle by name if it exists, otherwise, check the
84    /// database, opening the DB if it exists.
85    pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
86        self.get_dbi_raw(T::NAME)
87    }
88
89    /// Create db Cursor
90    pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
91        let inner = self
92            .inner
93            .cursor_with_dbi(self.get_dbi::<T>()?)
94            .map_err(|e| DatabaseError::InitCursor(e.into()))?;
95
96        Ok(Cursor::new_with_metrics(
97            inner,
98            self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
99        ))
100    }
101
102    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
103    /// record a metric with the provided transaction outcome.
104    ///
105    /// Otherwise, just execute the closure.
106    fn execute_with_close_transaction_metric<R>(
107        mut self,
108        outcome: TransactionOutcome,
109        f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
110    ) -> R {
111        let run = |tx| {
112            let start = Instant::now();
113            let (result, commit_latency) = f(tx);
114            let total_duration = start.elapsed();
115
116            if outcome.is_commit() {
117                debug!(
118                    target: "storage::db::mdbx",
119                    ?total_duration,
120                    ?commit_latency,
121                    is_read_only = K::IS_READ_ONLY,
122                    "Commit"
123                );
124            }
125
126            (result, commit_latency, total_duration)
127        };
128
129        if let Some(mut metrics_handler) = self.metrics_handler.take() {
130            metrics_handler.close_recorded = true;
131            metrics_handler.log_backtrace_on_long_read_transaction();
132
133            let (result, commit_latency, close_duration) = run(self);
134            let open_duration = metrics_handler.start.elapsed();
135            metrics_handler.env_metrics.record_closed_transaction(
136                metrics_handler.transaction_mode(),
137                outcome,
138                open_duration,
139                Some(close_duration),
140                commit_latency,
141            );
142
143            result
144        } else {
145            run(self).0
146        }
147    }
148
149    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
150    /// record a metric with the provided operation.
151    ///
152    /// Otherwise, just execute the closure.
153    fn execute_with_operation_metric<T: Table, R>(
154        &self,
155        operation: Operation,
156        value_size: Option<usize>,
157        f: impl FnOnce(&Transaction<K>) -> R,
158    ) -> R {
159        if let Some(metrics_handler) = &self.metrics_handler {
160            metrics_handler.log_backtrace_on_long_read_transaction();
161            metrics_handler
162                .env_metrics
163                .record_operation(T::NAME, operation, value_size, || f(&self.inner))
164        } else {
165            f(&self.inner)
166        }
167    }
168}
169
170#[derive(Debug)]
171struct MetricsHandler<K: TransactionKind> {
172    /// Cached internal transaction ID provided by libmdbx.
173    txn_id: u64,
174    /// The time when transaction has started.
175    start: Instant,
176    /// Duration after which we emit the log about long-lived database transactions.
177    long_transaction_duration: Duration,
178    /// If `true`, the metric about transaction closing has already been recorded and we don't need
179    /// to do anything on [`Drop::drop`].
180    close_recorded: bool,
181    /// If `true`, the backtrace of transaction will be recorded and logged.
182    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
183    record_backtrace: bool,
184    /// If `true`, the backtrace of transaction has already been recorded and logged.
185    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
186    backtrace_recorded: AtomicBool,
187    /// Shared database environment metrics.
188    env_metrics: Arc<DatabaseEnvMetrics>,
189    /// Backtrace of the location where the transaction has been opened. Reported only with debug
190    /// assertions, because capturing the backtrace on every transaction opening is expensive.
191    #[cfg(debug_assertions)]
192    open_backtrace: Backtrace,
193    _marker: PhantomData<K>,
194}
195
196impl<K: TransactionKind> MetricsHandler<K> {
197    fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
198        Self {
199            txn_id,
200            start: Instant::now(),
201            long_transaction_duration: LONG_TRANSACTION_DURATION,
202            close_recorded: false,
203            record_backtrace: true,
204            backtrace_recorded: AtomicBool::new(false),
205            #[cfg(debug_assertions)]
206            open_backtrace: Backtrace::force_capture(),
207            env_metrics,
208            _marker: PhantomData,
209        }
210    }
211
212    const fn transaction_mode(&self) -> TransactionMode {
213        if K::IS_READ_ONLY {
214            TransactionMode::ReadOnly
215        } else {
216            TransactionMode::ReadWrite
217        }
218    }
219
220    /// Logs the caller location and ID of the transaction that was opened.
221    #[track_caller]
222    fn log_transaction_opened(&self) {
223        trace!(
224            target: "storage::db::mdbx",
225            caller = %core::panic::Location::caller(),
226            id = %self.txn_id,
227            mode = %self.transaction_mode().as_str(),
228            "Transaction opened",
229        );
230    }
231
232    /// Logs the backtrace of current call if the duration that the read transaction has been open
233    /// is more than [`LONG_TRANSACTION_DURATION`] and `record_backtrace == true`.
234    /// The backtrace is recorded and logged just once, guaranteed by `backtrace_recorded` atomic.
235    ///
236    /// NOTE: Backtrace is recorded using [`Backtrace::force_capture`], so `RUST_BACKTRACE` env var
237    /// is not needed.
238    fn log_backtrace_on_long_read_transaction(&self) {
239        if self.record_backtrace &&
240            !self.backtrace_recorded.load(Ordering::Relaxed) &&
241            self.transaction_mode().is_read_only()
242        {
243            let open_duration = self.start.elapsed();
244            if open_duration >= self.long_transaction_duration {
245                self.backtrace_recorded.store(true, Ordering::Relaxed);
246                #[cfg(debug_assertions)]
247                let message = format!(
248                    "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
249                    self.open_backtrace,
250                    Backtrace::force_capture()
251                );
252                #[cfg(not(debug_assertions))]
253                let message = format!(
254                    "The database read transaction has been open for too long. Backtrace:\n{}",
255                    Backtrace::force_capture()
256                );
257                warn!(
258                    target: "storage::db::mdbx",
259                    ?open_duration,
260                    %self.txn_id,
261                    "{message}"
262                );
263            }
264        }
265    }
266}
267
268impl<K: TransactionKind> Drop for MetricsHandler<K> {
269    fn drop(&mut self) {
270        if !self.close_recorded {
271            self.log_backtrace_on_long_read_transaction();
272            self.env_metrics.record_closed_transaction(
273                self.transaction_mode(),
274                TransactionOutcome::Drop,
275                self.start.elapsed(),
276                None,
277                None,
278            );
279        }
280    }
281}
282
283impl TableImporter for Tx<RW> {}
284
285impl<K: TransactionKind> DbTx for Tx<K> {
286    type Cursor<T: Table> = Cursor<K, T>;
287    type DupCursor<T: DupSort> = Cursor<K, T>;
288
289    fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
290        self.get_by_encoded_key::<T>(&key.encode())
291    }
292
293    fn get_by_encoded_key<T: Table>(
294        &self,
295        key: &<T::Key as Encode>::Encoded,
296    ) -> Result<Option<T::Value>, DatabaseError> {
297        self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
298            tx.get(self.get_dbi::<T>()?, key.as_ref())
299                .map_err(|e| DatabaseError::Read(e.into()))?
300                .map(decode_one::<T>)
301                .transpose()
302        })
303    }
304
305    fn commit(self) -> Result<(), DatabaseError> {
306        self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
307            match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
308                Ok(latency) => (Ok(()), Some(latency)),
309                Err(e) => (Err(e), None),
310            }
311        })
312    }
313
314    fn abort(self) {
315        self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
316            (drop(this.inner), None)
317        })
318    }
319
320    // Iterate over read only values in database.
321    fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
322        self.new_cursor()
323    }
324
325    /// Iterate over read only values in database.
326    fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
327        self.new_cursor()
328    }
329
330    /// Returns number of entries in the table using cheap DB stats invocation.
331    fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
332        Ok(self
333            .inner
334            .db_stat_with_dbi(self.get_dbi::<T>()?)
335            .map_err(|e| DatabaseError::Stats(e.into()))?
336            .entries())
337    }
338
339    /// Disables long-lived read transaction safety guarantees, such as backtrace recording and
340    /// timeout.
341    fn disable_long_read_transaction_safety(&mut self) {
342        if let Some(metrics_handler) = self.metrics_handler.as_mut() {
343            metrics_handler.record_backtrace = false;
344        }
345
346        self.inner.disable_timeout();
347    }
348}
349
350#[derive(Clone, Copy)]
351enum PutKind {
352    /// Default kind that inserts a new key-value or overwrites an existed key.
353    Upsert,
354    /// Append the key-value to the end of the table -- fast path when the new
355    /// key is the highest so far, like the latest block number.
356    Append,
357}
358
359impl PutKind {
360    const fn into_operation_and_flags(self) -> (Operation, DatabaseWriteOperation, WriteFlags) {
361        match self {
362            Self::Upsert => {
363                (Operation::PutUpsert, DatabaseWriteOperation::PutUpsert, WriteFlags::UPSERT)
364            }
365            Self::Append => {
366                (Operation::PutAppend, DatabaseWriteOperation::PutAppend, WriteFlags::APPEND)
367            }
368        }
369    }
370}
371
372impl Tx<RW> {
373    /// The inner implementation mapping to `mdbx_put` that supports different
374    /// put kinds like upserting and appending.
375    fn put<T: Table>(
376        &self,
377        kind: PutKind,
378        key: T::Key,
379        value: T::Value,
380    ) -> Result<(), DatabaseError> {
381        let key = key.encode();
382        let value = value.compress();
383        let (operation, write_operation, flags) = kind.into_operation_and_flags();
384        self.execute_with_operation_metric::<T, _>(operation, Some(value.as_ref().len()), |tx| {
385            tx.put(self.get_dbi::<T>()?, key.as_ref(), value, flags).map_err(|e| {
386                DatabaseWriteError {
387                    info: e.into(),
388                    operation: write_operation,
389                    table_name: T::NAME,
390                    key: key.into_vec(),
391                }
392                .into()
393            })
394        })
395    }
396}
397
398impl DbTxMut for Tx<RW> {
399    type CursorMut<T: Table> = Cursor<RW, T>;
400    type DupCursorMut<T: DupSort> = Cursor<RW, T>;
401
402    fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
403        self.put::<T>(PutKind::Upsert, key, value)
404    }
405
406    fn append<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
407        self.put::<T>(PutKind::Append, key, value)
408    }
409
410    fn delete<T: Table>(
411        &self,
412        key: T::Key,
413        value: Option<T::Value>,
414    ) -> Result<bool, DatabaseError> {
415        let mut data = None;
416
417        let value = value.map(Compress::compress);
418        if let Some(value) = &value {
419            data = Some(value.as_ref());
420        };
421
422        self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
423            tx.del(self.get_dbi::<T>()?, key.encode(), data)
424                .map_err(|e| DatabaseError::Delete(e.into()))
425        })
426    }
427
428    fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
429        self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
430
431        Ok(())
432    }
433
434    fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
435        self.new_cursor()
436    }
437
438    fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
439        self.new_cursor()
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
446    use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
447    use reth_libmdbx::MaxReadTransactionDuration;
448    use reth_storage_errors::db::DatabaseError;
449    use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
450    use tempfile::tempdir;
451
452    #[test]
453    fn long_read_transaction_safety_disabled() {
454        const MAX_DURATION: Duration = Duration::from_secs(1);
455
456        let dir = tempdir().unwrap();
457        let args = DatabaseArguments::new(ClientVersion::default())
458            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
459                MAX_DURATION,
460            )));
461        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
462
463        let mut tx = db.tx().unwrap();
464        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
465        tx.disable_long_read_transaction_safety();
466        // Give the `TxnManager` some time to time out the transaction.
467        sleep(MAX_DURATION + Duration::from_millis(100));
468
469        // Transaction has not timed out.
470        assert!(matches!(
471            tx.get::<tables::Transactions>(0).unwrap_err(),
472            DatabaseError::Open(err) if err == reth_libmdbx::Error::NotFound.into()));
473        // Backtrace is not recorded.
474        assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
475    }
476
477    #[test]
478    fn long_read_transaction_safety_enabled() {
479        const MAX_DURATION: Duration = Duration::from_secs(1);
480
481        let dir = tempdir().unwrap();
482        let args = DatabaseArguments::new(ClientVersion::default())
483            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
484                MAX_DURATION,
485            )));
486        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
487
488        let mut tx = db.tx().unwrap();
489        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
490        // Give the `TxnManager` some time to time out the transaction.
491        sleep(MAX_DURATION + Duration::from_millis(100));
492
493        // Transaction has timed out.
494        assert!(matches!(
495            tx.get::<tables::Transactions>(0).unwrap_err(),
496            DatabaseError::Open(err) if err == reth_libmdbx::Error::ReadTransactionTimeout.into()));
497        // Backtrace is recorded.
498        assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
499    }
500}