reth_db/implementation/mdbx/
tx.rs1use super::{cursor::Cursor, utils::*};
4use crate::{
5 metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6 DatabaseError,
7};
8use reth_db_api::{
9 table::{Compress, DupSort, Encode, Table, TableImporter},
10 transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, trace, warn};
15use std::{
16 backtrace::Backtrace,
17 collections::HashMap,
18 marker::PhantomData,
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22 },
23 time::{Duration, Instant},
24};
25
26const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32 pub inner: Transaction<K>,
34
35 dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
37
38 metrics_handler: Option<MetricsHandler<K>>,
43}
44
45impl<K: TransactionKind> Tx<K> {
46 #[inline]
48 #[track_caller]
49 pub(crate) fn new(
50 inner: Transaction<K>,
51 dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
52 env_metrics: Option<Arc<DatabaseEnvMetrics>>,
53 ) -> reth_libmdbx::Result<Self> {
54 let metrics_handler = env_metrics
55 .map(|env_metrics| {
56 let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
57 handler.env_metrics.record_opened_transaction(handler.transaction_mode());
58 handler.log_transaction_opened();
59 Ok(handler)
60 })
61 .transpose()?;
62 Ok(Self { inner, dbis, metrics_handler })
63 }
64
65 pub fn id(&self) -> reth_libmdbx::Result<u64> {
67 self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
68 }
69
70 pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
72 if let Some(dbi) = self.dbis.get(T::NAME) {
73 Ok(*dbi)
74 } else {
75 self.inner
76 .open_db(Some(T::NAME))
77 .map(|db| db.dbi())
78 .map_err(|e| DatabaseError::Open(e.into()))
79 }
80 }
81
82 pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
84 let inner = self
85 .inner
86 .cursor_with_dbi(self.get_dbi::<T>()?)
87 .map_err(|e| DatabaseError::InitCursor(e.into()))?;
88
89 Ok(Cursor::new_with_metrics(
90 inner,
91 self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
92 ))
93 }
94
95 fn execute_with_close_transaction_metric<R>(
100 mut self,
101 outcome: TransactionOutcome,
102 f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
103 ) -> R {
104 let run = |tx| {
105 let start = Instant::now();
106 let (result, commit_latency) = f(tx);
107 let total_duration = start.elapsed();
108
109 if outcome.is_commit() {
110 debug!(
111 target: "storage::db::mdbx",
112 ?total_duration,
113 ?commit_latency,
114 is_read_only = K::IS_READ_ONLY,
115 "Commit"
116 );
117 }
118
119 (result, commit_latency, total_duration)
120 };
121
122 if let Some(mut metrics_handler) = self.metrics_handler.take() {
123 metrics_handler.close_recorded = true;
124 metrics_handler.log_backtrace_on_long_read_transaction();
125
126 let (result, commit_latency, close_duration) = run(self);
127 let open_duration = metrics_handler.start.elapsed();
128 metrics_handler.env_metrics.record_closed_transaction(
129 metrics_handler.transaction_mode(),
130 outcome,
131 open_duration,
132 Some(close_duration),
133 commit_latency,
134 );
135
136 result
137 } else {
138 run(self).0
139 }
140 }
141
142 fn execute_with_operation_metric<T: Table, R>(
147 &self,
148 operation: Operation,
149 value_size: Option<usize>,
150 f: impl FnOnce(&Transaction<K>) -> R,
151 ) -> R {
152 if let Some(metrics_handler) = &self.metrics_handler {
153 metrics_handler.log_backtrace_on_long_read_transaction();
154 metrics_handler
155 .env_metrics
156 .record_operation(T::NAME, operation, value_size, || f(&self.inner))
157 } else {
158 f(&self.inner)
159 }
160 }
161}
162
163#[derive(Debug)]
164struct MetricsHandler<K: TransactionKind> {
165 txn_id: u64,
167 start: Instant,
169 long_transaction_duration: Duration,
171 close_recorded: bool,
174 record_backtrace: bool,
177 backtrace_recorded: AtomicBool,
180 env_metrics: Arc<DatabaseEnvMetrics>,
182 #[cfg(debug_assertions)]
185 open_backtrace: Backtrace,
186 _marker: PhantomData<K>,
187}
188
189impl<K: TransactionKind> MetricsHandler<K> {
190 fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
191 Self {
192 txn_id,
193 start: Instant::now(),
194 long_transaction_duration: LONG_TRANSACTION_DURATION,
195 close_recorded: false,
196 record_backtrace: true,
197 backtrace_recorded: AtomicBool::new(false),
198 #[cfg(debug_assertions)]
199 open_backtrace: Backtrace::force_capture(),
200 env_metrics,
201 _marker: PhantomData,
202 }
203 }
204
205 const fn transaction_mode(&self) -> TransactionMode {
206 if K::IS_READ_ONLY {
207 TransactionMode::ReadOnly
208 } else {
209 TransactionMode::ReadWrite
210 }
211 }
212
213 #[track_caller]
215 fn log_transaction_opened(&self) {
216 trace!(
217 target: "storage::db::mdbx",
218 caller = %core::panic::Location::caller(),
219 id = %self.txn_id,
220 mode = %self.transaction_mode().as_str(),
221 "Transaction opened",
222 );
223 }
224
225 fn log_backtrace_on_long_read_transaction(&self) {
232 if self.record_backtrace &&
233 !self.backtrace_recorded.load(Ordering::Relaxed) &&
234 self.transaction_mode().is_read_only()
235 {
236 let open_duration = self.start.elapsed();
237 if open_duration >= self.long_transaction_duration {
238 self.backtrace_recorded.store(true, Ordering::Relaxed);
239 #[cfg(debug_assertions)]
240 let message = format!(
241 "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
242 self.open_backtrace,
243 Backtrace::force_capture()
244 );
245 #[cfg(not(debug_assertions))]
246 let message = format!(
247 "The database read transaction has been open for too long. Backtrace:\n{}",
248 Backtrace::force_capture()
249 );
250 warn!(
251 target: "storage::db::mdbx",
252 ?open_duration,
253 %self.txn_id,
254 "{message}"
255 );
256 }
257 }
258 }
259}
260
261impl<K: TransactionKind> Drop for MetricsHandler<K> {
262 fn drop(&mut self) {
263 if !self.close_recorded {
264 self.log_backtrace_on_long_read_transaction();
265 self.env_metrics.record_closed_transaction(
266 self.transaction_mode(),
267 TransactionOutcome::Drop,
268 self.start.elapsed(),
269 None,
270 None,
271 );
272 }
273 }
274}
275
276impl TableImporter for Tx<RW> {}
277
278impl<K: TransactionKind> DbTx for Tx<K> {
279 type Cursor<T: Table> = Cursor<K, T>;
280 type DupCursor<T: DupSort> = Cursor<K, T>;
281
282 fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
283 self.get_by_encoded_key::<T>(&key.encode())
284 }
285
286 fn get_by_encoded_key<T: Table>(
287 &self,
288 key: &<T::Key as Encode>::Encoded,
289 ) -> Result<Option<T::Value>, DatabaseError> {
290 self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
291 tx.get(self.get_dbi::<T>()?, key.as_ref())
292 .map_err(|e| DatabaseError::Read(e.into()))?
293 .map(decode_one::<T>)
294 .transpose()
295 })
296 }
297
298 fn commit(self) -> Result<bool, DatabaseError> {
299 self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
300 match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
301 Ok((v, latency)) => (Ok(v), Some(latency)),
302 Err(e) => (Err(e), None),
303 }
304 })
305 }
306
307 fn abort(self) {
308 self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
309 (drop(this.inner), None)
310 })
311 }
312
313 fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
315 self.new_cursor()
316 }
317
318 fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
320 self.new_cursor()
321 }
322
323 fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
325 Ok(self
326 .inner
327 .db_stat_with_dbi(self.get_dbi::<T>()?)
328 .map_err(|e| DatabaseError::Stats(e.into()))?
329 .entries())
330 }
331
332 fn disable_long_read_transaction_safety(&mut self) {
335 if let Some(metrics_handler) = self.metrics_handler.as_mut() {
336 metrics_handler.record_backtrace = false;
337 }
338
339 self.inner.disable_timeout();
340 }
341}
342
343#[derive(Clone, Copy)]
344enum PutKind {
345 Upsert,
347 Append,
350}
351
352impl PutKind {
353 const fn into_operation_and_flags(self) -> (Operation, DatabaseWriteOperation, WriteFlags) {
354 match self {
355 Self::Upsert => {
356 (Operation::PutUpsert, DatabaseWriteOperation::PutUpsert, WriteFlags::UPSERT)
357 }
358 Self::Append => {
359 (Operation::PutAppend, DatabaseWriteOperation::PutAppend, WriteFlags::APPEND)
360 }
361 }
362 }
363}
364
365impl Tx<RW> {
366 fn put<T: Table>(
369 &self,
370 kind: PutKind,
371 key: T::Key,
372 value: T::Value,
373 ) -> Result<(), DatabaseError> {
374 let key = key.encode();
375 let value = value.compress();
376 let (operation, write_operation, flags) = kind.into_operation_and_flags();
377 self.execute_with_operation_metric::<T, _>(operation, Some(value.as_ref().len()), |tx| {
378 tx.put(self.get_dbi::<T>()?, key.as_ref(), value, flags).map_err(|e| {
379 DatabaseWriteError {
380 info: e.into(),
381 operation: write_operation,
382 table_name: T::NAME,
383 key: key.into(),
384 }
385 .into()
386 })
387 })
388 }
389}
390
391impl DbTxMut for Tx<RW> {
392 type CursorMut<T: Table> = Cursor<RW, T>;
393 type DupCursorMut<T: DupSort> = Cursor<RW, T>;
394
395 fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
396 self.put::<T>(PutKind::Upsert, key, value)
397 }
398
399 fn append<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
400 self.put::<T>(PutKind::Append, key, value)
401 }
402
403 fn delete<T: Table>(
404 &self,
405 key: T::Key,
406 value: Option<T::Value>,
407 ) -> Result<bool, DatabaseError> {
408 let mut data = None;
409
410 let value = value.map(Compress::compress);
411 if let Some(value) = &value {
412 data = Some(value.as_ref());
413 };
414
415 self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
416 tx.del(self.get_dbi::<T>()?, key.encode(), data)
417 .map_err(|e| DatabaseError::Delete(e.into()))
418 })
419 }
420
421 fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
422 self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
423
424 Ok(())
425 }
426
427 fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
428 self.new_cursor()
429 }
430
431 fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
432 self.new_cursor()
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
439 use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
440 use reth_libmdbx::MaxReadTransactionDuration;
441 use reth_storage_errors::db::DatabaseError;
442 use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
443 use tempfile::tempdir;
444
445 #[test]
446 fn long_read_transaction_safety_disabled() {
447 const MAX_DURATION: Duration = Duration::from_secs(1);
448
449 let dir = tempdir().unwrap();
450 let args = DatabaseArguments::new(ClientVersion::default())
451 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
452 MAX_DURATION,
453 )));
454 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
455
456 let mut tx = db.tx().unwrap();
457 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
458 tx.disable_long_read_transaction_safety();
459 sleep(MAX_DURATION + Duration::from_millis(100));
461
462 assert_eq!(
464 tx.get::<tables::Transactions>(0),
465 Err(DatabaseError::Open(reth_libmdbx::Error::NotFound.into()))
466 );
467 assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
469 }
470
471 #[test]
472 fn long_read_transaction_safety_enabled() {
473 const MAX_DURATION: Duration = Duration::from_secs(1);
474
475 let dir = tempdir().unwrap();
476 let args = DatabaseArguments::new(ClientVersion::default())
477 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
478 MAX_DURATION,
479 )));
480 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
481
482 let mut tx = db.tx().unwrap();
483 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
484 sleep(MAX_DURATION + Duration::from_millis(100));
486
487 assert_eq!(
489 tx.get::<tables::Transactions>(0),
490 Err(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
491 );
492 assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
494 }
495}