Skip to main content

reth_db/implementation/mdbx/
tx.rs

1//! Transaction wrapper for libmdbx-sys.
2
3use super::{cursor::Cursor, utils::*};
4use crate::{
5    metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6    DatabaseError,
7};
8use reth_db_api::{
9    table::{Compress, DupSort, Encode, IntoVec, Table, TableImporter},
10    transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, instrument, trace, warn};
15use std::{
16    backtrace::Backtrace,
17    collections::HashMap,
18    marker::PhantomData,
19    sync::{
20        atomic::{AtomicBool, Ordering},
21        Arc,
22    },
23    time::{Duration, Instant},
24};
25
26/// Duration after which we emit the log about long-lived database transactions.
27const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29/// Wrapper for the libmdbx transaction.
30#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32    /// Libmdbx-sys transaction.
33    inner: Transaction<K>,
34
35    /// Cached MDBX DBIs for reuse.
36    dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
37
38    /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
39    /// closed by [`Tx::commit`] or [`Tx::abort`], but we still need to report it in the metrics.
40    ///
41    /// If [Some], then metrics are reported.
42    metrics_handler: Option<MetricsHandler<K>>,
43}
44
45impl<K: TransactionKind> Tx<K> {
46    /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
47    #[inline]
48    #[track_caller]
49    pub(crate) fn new(
50        inner: Transaction<K>,
51        dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
52        env_metrics: Option<Arc<DatabaseEnvMetrics>>,
53    ) -> reth_libmdbx::Result<Self> {
54        let metrics_handler = env_metrics
55            .map(|env_metrics| {
56                let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
57                handler.env_metrics.record_opened_transaction(handler.transaction_mode());
58                handler.log_transaction_opened();
59                Ok(handler)
60            })
61            .transpose()?;
62        Ok(Self { inner, dbis, metrics_handler })
63    }
64
65    /// Returns a reference to the inner libmdbx transaction.
66    pub const fn inner(&self) -> &Transaction<K> {
67        &self.inner
68    }
69
70    /// Gets this transaction ID.
71    pub fn id(&self) -> reth_libmdbx::Result<u64> {
72        self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
73    }
74
75    /// Gets a table database handle by name if it exists, otherwise, check the
76    /// database, opening the DB if it exists.
77    pub fn get_dbi_raw(&self, name: &str) -> Result<MDBX_dbi, DatabaseError> {
78        if let Some(dbi) = self.dbis.get(name) {
79            Ok(*dbi)
80        } else {
81            self.inner
82                .open_db(Some(name))
83                .map(|db| db.dbi())
84                .map_err(|e| DatabaseError::Open(e.into()))
85        }
86    }
87
88    /// Gets a table database handle by name if it exists, otherwise, check the
89    /// database, opening the DB if it exists.
90    pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
91        self.get_dbi_raw(T::NAME)
92    }
93
94    /// Create db Cursor
95    pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
96        let inner = self
97            .inner
98            .cursor_with_dbi(self.get_dbi::<T>()?)
99            .map_err(|e| DatabaseError::InitCursor(e.into()))?;
100
101        Ok(Cursor::new_with_metrics(
102            inner,
103            self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
104        ))
105    }
106
107    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
108    /// record a metric with the provided transaction outcome.
109    ///
110    /// Otherwise, just execute the closure.
111    fn execute_with_close_transaction_metric<R>(
112        mut self,
113        outcome: TransactionOutcome,
114        f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
115    ) -> R {
116        let run = |tx| {
117            let start = Instant::now();
118            let (result, commit_latency) = f(tx);
119            let total_duration = start.elapsed();
120
121            if outcome.is_commit() {
122                debug!(
123                    target: "storage::db::mdbx",
124                    ?total_duration,
125                    ?commit_latency,
126                    is_read_only = K::IS_READ_ONLY,
127                    "Commit"
128                );
129            }
130
131            (result, commit_latency, total_duration)
132        };
133
134        if let Some(mut metrics_handler) = self.metrics_handler.take() {
135            metrics_handler.close_recorded = true;
136            metrics_handler.log_backtrace_on_long_read_transaction();
137
138            let (result, commit_latency, close_duration) = run(self);
139            let open_duration = metrics_handler.start.elapsed();
140            metrics_handler.env_metrics.record_closed_transaction(
141                metrics_handler.transaction_mode(),
142                outcome,
143                open_duration,
144                Some(close_duration),
145                commit_latency,
146            );
147
148            result
149        } else {
150            run(self).0
151        }
152    }
153
154    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
155    /// record a metric with the provided operation.
156    ///
157    /// Otherwise, just execute the closure.
158    fn execute_with_operation_metric<T: Table, R>(
159        &self,
160        operation: Operation,
161        value_size: Option<usize>,
162        f: impl FnOnce(&Transaction<K>) -> R,
163    ) -> R {
164        if let Some(metrics_handler) = &self.metrics_handler {
165            metrics_handler.log_backtrace_on_long_read_transaction();
166            metrics_handler
167                .env_metrics
168                .record_operation(T::NAME, operation, value_size, || f(&self.inner))
169        } else {
170            f(&self.inner)
171        }
172    }
173}
174
175#[derive(Debug)]
176struct MetricsHandler<K: TransactionKind> {
177    /// Cached internal transaction ID provided by libmdbx.
178    txn_id: u64,
179    /// The time when transaction has started.
180    start: Instant,
181    /// Duration after which we emit the log about long-lived database transactions.
182    long_transaction_duration: Duration,
183    /// If `true`, the metric about transaction closing has already been recorded and we don't need
184    /// to do anything on [`Drop::drop`].
185    close_recorded: bool,
186    /// If `true`, the backtrace of transaction will be recorded and logged.
187    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
188    record_backtrace: bool,
189    /// If `true`, the backtrace of transaction has already been recorded and logged.
190    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
191    backtrace_recorded: AtomicBool,
192    /// Shared database environment metrics.
193    env_metrics: Arc<DatabaseEnvMetrics>,
194    /// Backtrace of the location where the transaction has been opened. Reported only with debug
195    /// assertions, because capturing the backtrace on every transaction opening is expensive.
196    #[cfg(debug_assertions)]
197    open_backtrace: Backtrace,
198    _marker: PhantomData<K>,
199}
200
201impl<K: TransactionKind> MetricsHandler<K> {
202    fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
203        Self {
204            txn_id,
205            start: Instant::now(),
206            long_transaction_duration: LONG_TRANSACTION_DURATION,
207            close_recorded: false,
208            record_backtrace: true,
209            backtrace_recorded: AtomicBool::new(false),
210            #[cfg(debug_assertions)]
211            open_backtrace: Backtrace::force_capture(),
212            env_metrics,
213            _marker: PhantomData,
214        }
215    }
216
217    const fn transaction_mode(&self) -> TransactionMode {
218        if K::IS_READ_ONLY {
219            TransactionMode::ReadOnly
220        } else {
221            TransactionMode::ReadWrite
222        }
223    }
224
225    /// Logs the caller location and ID of the transaction that was opened.
226    #[track_caller]
227    fn log_transaction_opened(&self) {
228        trace!(
229            target: "storage::db::mdbx",
230            caller = %core::panic::Location::caller(),
231            id = %self.txn_id,
232            mode = %self.transaction_mode().as_str(),
233            "Transaction opened",
234        );
235    }
236
237    /// Logs the backtrace of current call if the duration that the read transaction has been open
238    /// is more than [`LONG_TRANSACTION_DURATION`] and `record_backtrace == true`.
239    /// The backtrace is recorded and logged just once, guaranteed by `backtrace_recorded` atomic.
240    ///
241    /// NOTE: Backtrace is recorded using [`Backtrace::force_capture`], so `RUST_BACKTRACE` env var
242    /// is not needed.
243    fn log_backtrace_on_long_read_transaction(&self) {
244        if self.record_backtrace &&
245            !self.backtrace_recorded.load(Ordering::Relaxed) &&
246            self.transaction_mode().is_read_only()
247        {
248            let open_duration = self.start.elapsed();
249            if open_duration >= self.long_transaction_duration {
250                self.backtrace_recorded.store(true, Ordering::Relaxed);
251                #[cfg(debug_assertions)]
252                let message = format!(
253                    "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
254                    self.open_backtrace,
255                    Backtrace::force_capture()
256                );
257                #[cfg(not(debug_assertions))]
258                let message = format!(
259                    "The database read transaction has been open for too long. Backtrace:\n{}",
260                    Backtrace::force_capture()
261                );
262                warn!(
263                    target: "storage::db::mdbx",
264                    ?open_duration,
265                    %self.txn_id,
266                    "{message}"
267                );
268            }
269        }
270    }
271}
272
273impl<K: TransactionKind> Drop for MetricsHandler<K> {
274    fn drop(&mut self) {
275        if !self.close_recorded {
276            self.log_backtrace_on_long_read_transaction();
277            self.env_metrics.record_closed_transaction(
278                self.transaction_mode(),
279                TransactionOutcome::Drop,
280                self.start.elapsed(),
281                None,
282                None,
283            );
284        }
285    }
286}
287
288impl TableImporter for Tx<RW> {}
289
290impl<K: TransactionKind> DbTx for Tx<K> {
291    type Cursor<T: Table> = Cursor<K, T>;
292    type DupCursor<T: DupSort> = Cursor<K, T>;
293
294    fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
295        self.get_by_encoded_key::<T>(&key.encode())
296    }
297
298    fn get_by_encoded_key<T: Table>(
299        &self,
300        key: &<T::Key as Encode>::Encoded,
301    ) -> Result<Option<T::Value>, DatabaseError> {
302        self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
303            tx.get(self.get_dbi::<T>()?, key.as_ref())
304                .map_err(|e| DatabaseError::Read(e.into()))?
305                .map(decode_one::<T>)
306                .transpose()
307        })
308    }
309
310    #[instrument(name = "Tx::commit", level = "debug", target = "providers::db", skip_all)]
311    fn commit(self) -> Result<(), DatabaseError> {
312        self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
313            match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
314                Ok(latency) => (Ok(()), Some(latency)),
315                Err(e) => (Err(e), None),
316            }
317        })
318    }
319
320    fn abort(self) {
321        self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
322            (drop(this.inner), None)
323        })
324    }
325
326    // Iterate over read only values in database.
327    fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
328        self.new_cursor()
329    }
330
331    /// Iterate over read only values in database.
332    fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
333        self.new_cursor()
334    }
335
336    /// Returns number of entries in the table using cheap DB stats invocation.
337    fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
338        Ok(self
339            .inner
340            .db_stat_with_dbi(self.get_dbi::<T>()?)
341            .map_err(|e| DatabaseError::Stats(e.into()))?
342            .entries())
343    }
344
345    /// Disables long-lived read transaction safety guarantees, such as backtrace recording and
346    /// timeout.
347    fn disable_long_read_transaction_safety(&mut self) {
348        if let Some(metrics_handler) = self.metrics_handler.as_mut() {
349            metrics_handler.record_backtrace = false;
350        }
351
352        self.inner.disable_timeout();
353    }
354}
355
356#[derive(Clone, Copy)]
357enum PutKind {
358    /// Default kind that inserts a new key-value or overwrites an existed key.
359    Upsert,
360    /// Append the key-value to the end of the table -- fast path when the new
361    /// key is the highest so far, like the latest block number.
362    Append,
363}
364
365impl PutKind {
366    const fn into_operation_and_flags(self) -> (Operation, DatabaseWriteOperation, WriteFlags) {
367        match self {
368            Self::Upsert => {
369                (Operation::PutUpsert, DatabaseWriteOperation::PutUpsert, WriteFlags::UPSERT)
370            }
371            Self::Append => {
372                (Operation::PutAppend, DatabaseWriteOperation::PutAppend, WriteFlags::APPEND)
373            }
374        }
375    }
376}
377
378impl Tx<RW> {
379    /// The inner implementation mapping to `mdbx_put` that supports different
380    /// put kinds like upserting and appending.
381    fn put<T: Table>(
382        &self,
383        kind: PutKind,
384        key: T::Key,
385        value: T::Value,
386    ) -> Result<(), DatabaseError> {
387        let key = key.encode();
388        let value = value.compress();
389        let (operation, write_operation, flags) = kind.into_operation_and_flags();
390        self.execute_with_operation_metric::<T, _>(operation, Some(value.as_ref().len()), |tx| {
391            tx.put(self.get_dbi::<T>()?, key.as_ref(), value, flags).map_err(|e| {
392                DatabaseWriteError {
393                    info: e.into(),
394                    operation: write_operation,
395                    table_name: T::NAME,
396                    key: key.into_vec(),
397                }
398                .into()
399            })
400        })
401    }
402}
403
404impl DbTxMut for Tx<RW> {
405    type CursorMut<T: Table> = Cursor<RW, T>;
406    type DupCursorMut<T: DupSort> = Cursor<RW, T>;
407
408    fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
409        self.put::<T>(PutKind::Upsert, key, value)
410    }
411
412    fn append<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
413        self.put::<T>(PutKind::Append, key, value)
414    }
415
416    fn delete<T: Table>(
417        &self,
418        key: T::Key,
419        value: Option<T::Value>,
420    ) -> Result<bool, DatabaseError> {
421        let mut data = None;
422
423        let value = value.map(Compress::compress);
424        if let Some(value) = &value {
425            data = Some(value.as_ref());
426        };
427
428        self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
429            tx.del(self.get_dbi::<T>()?, key.encode(), data)
430                .map_err(|e| DatabaseError::Delete(e.into()))
431        })
432    }
433
434    fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
435        self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
436
437        Ok(())
438    }
439
440    fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
441        self.new_cursor()
442    }
443
444    fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
445        self.new_cursor()
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
452    use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
453    use reth_libmdbx::MaxReadTransactionDuration;
454    use reth_storage_errors::db::DatabaseError;
455    use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
456    use tempfile::tempdir;
457
458    #[test]
459    fn long_read_transaction_safety_disabled() {
460        const MAX_DURATION: Duration = Duration::from_secs(1);
461
462        let dir = tempdir().unwrap();
463        let args = DatabaseArguments::new(ClientVersion::default())
464            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
465                MAX_DURATION,
466            )));
467        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
468
469        let mut tx = db.tx().unwrap();
470        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
471        tx.disable_long_read_transaction_safety();
472        // Give the `TxnManager` some time to time out the transaction.
473        sleep(MAX_DURATION + Duration::from_millis(100));
474
475        // Transaction has not timed out.
476        assert!(matches!(
477            tx.get::<tables::Transactions>(0).unwrap_err(),
478            DatabaseError::Open(err) if err == reth_libmdbx::Error::NotFound.into()));
479        // Backtrace is not recorded.
480        assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
481    }
482
483    #[test]
484    fn long_read_transaction_safety_enabled() {
485        const MAX_DURATION: Duration = Duration::from_secs(1);
486
487        let dir = tempdir().unwrap();
488        let args = DatabaseArguments::new(ClientVersion::default())
489            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
490                MAX_DURATION,
491            )));
492        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
493
494        let mut tx = db.tx().unwrap();
495        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
496        // Give the `TxnManager` some time to time out the transaction.
497        sleep(MAX_DURATION + Duration::from_millis(100));
498
499        // Transaction has timed out.
500        assert!(matches!(
501            tx.get::<tables::Transactions>(0).unwrap_err(),
502            DatabaseError::Open(err) if err == reth_libmdbx::Error::ReadTransactionTimeout.into()));
503        // Backtrace is recorded.
504        assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
505    }
506}