reth_db/implementation/mdbx/
tx.rs
1use super::{cursor::Cursor, utils::*};
4use crate::{
5 metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6 DatabaseError,
7};
8use reth_db_api::{
9 table::{Compress, DupSort, Encode, Table, TableImporter},
10 transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, trace, warn};
15use std::{
16 backtrace::Backtrace,
17 marker::PhantomData,
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21 },
22 time::{Duration, Instant},
23};
24
25const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
27
28#[derive(Debug)]
30pub struct Tx<K: TransactionKind> {
31 pub inner: Transaction<K>,
33
34 metrics_handler: Option<MetricsHandler<K>>,
39}
40
41impl<K: TransactionKind> Tx<K> {
42 #[inline]
44 pub const fn new(inner: Transaction<K>) -> Self {
45 Self::new_inner(inner, None)
46 }
47
48 #[inline]
50 #[track_caller]
51 pub(crate) fn new_with_metrics(
52 inner: Transaction<K>,
53 env_metrics: Option<Arc<DatabaseEnvMetrics>>,
54 ) -> reth_libmdbx::Result<Self> {
55 let metrics_handler = env_metrics
56 .map(|env_metrics| {
57 let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
58 handler.env_metrics.record_opened_transaction(handler.transaction_mode());
59 handler.log_transaction_opened();
60 Ok(handler)
61 })
62 .transpose()?;
63 Ok(Self::new_inner(inner, metrics_handler))
64 }
65
66 #[inline]
67 const fn new_inner(inner: Transaction<K>, metrics_handler: Option<MetricsHandler<K>>) -> Self {
68 Self { inner, metrics_handler }
69 }
70
71 pub fn id(&self) -> reth_libmdbx::Result<u64> {
73 self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
74 }
75
76 pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
78 self.inner
79 .open_db(Some(T::NAME))
80 .map(|db| db.dbi())
81 .map_err(|e| DatabaseError::Open(e.into()))
82 }
83
84 pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
86 let inner = self
87 .inner
88 .cursor_with_dbi(self.get_dbi::<T>()?)
89 .map_err(|e| DatabaseError::InitCursor(e.into()))?;
90
91 Ok(Cursor::new_with_metrics(
92 inner,
93 self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
94 ))
95 }
96
97 fn execute_with_close_transaction_metric<R>(
102 mut self,
103 outcome: TransactionOutcome,
104 f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
105 ) -> R {
106 let run = |tx| {
107 let start = Instant::now();
108 let (result, commit_latency) = f(tx);
109 let total_duration = start.elapsed();
110
111 if outcome.is_commit() {
112 debug!(
113 target: "storage::db::mdbx",
114 ?total_duration,
115 ?commit_latency,
116 is_read_only = K::IS_READ_ONLY,
117 "Commit"
118 );
119 }
120
121 (result, commit_latency, total_duration)
122 };
123
124 if let Some(mut metrics_handler) = self.metrics_handler.take() {
125 metrics_handler.close_recorded = true;
126 metrics_handler.log_backtrace_on_long_read_transaction();
127
128 let (result, commit_latency, close_duration) = run(self);
129 let open_duration = metrics_handler.start.elapsed();
130 metrics_handler.env_metrics.record_closed_transaction(
131 metrics_handler.transaction_mode(),
132 outcome,
133 open_duration,
134 Some(close_duration),
135 commit_latency,
136 );
137
138 result
139 } else {
140 run(self).0
141 }
142 }
143
144 fn execute_with_operation_metric<T: Table, R>(
149 &self,
150 operation: Operation,
151 value_size: Option<usize>,
152 f: impl FnOnce(&Transaction<K>) -> R,
153 ) -> R {
154 if let Some(metrics_handler) = &self.metrics_handler {
155 metrics_handler.log_backtrace_on_long_read_transaction();
156 metrics_handler
157 .env_metrics
158 .record_operation(T::NAME, operation, value_size, || f(&self.inner))
159 } else {
160 f(&self.inner)
161 }
162 }
163}
164
165#[derive(Debug)]
166struct MetricsHandler<K: TransactionKind> {
167 txn_id: u64,
169 start: Instant,
171 long_transaction_duration: Duration,
173 close_recorded: bool,
176 record_backtrace: bool,
179 backtrace_recorded: AtomicBool,
182 env_metrics: Arc<DatabaseEnvMetrics>,
184 #[cfg(debug_assertions)]
187 open_backtrace: Backtrace,
188 _marker: PhantomData<K>,
189}
190
191impl<K: TransactionKind> MetricsHandler<K> {
192 fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
193 Self {
194 txn_id,
195 start: Instant::now(),
196 long_transaction_duration: LONG_TRANSACTION_DURATION,
197 close_recorded: false,
198 record_backtrace: true,
199 backtrace_recorded: AtomicBool::new(false),
200 #[cfg(debug_assertions)]
201 open_backtrace: Backtrace::force_capture(),
202 env_metrics,
203 _marker: PhantomData,
204 }
205 }
206
207 const fn transaction_mode(&self) -> TransactionMode {
208 if K::IS_READ_ONLY {
209 TransactionMode::ReadOnly
210 } else {
211 TransactionMode::ReadWrite
212 }
213 }
214
215 #[track_caller]
217 fn log_transaction_opened(&self) {
218 trace!(
219 target: "storage::db::mdbx",
220 caller = %core::panic::Location::caller(),
221 id = %self.txn_id,
222 mode = %self.transaction_mode().as_str(),
223 "Transaction opened",
224 );
225 }
226
227 fn log_backtrace_on_long_read_transaction(&self) {
234 if self.record_backtrace &&
235 !self.backtrace_recorded.load(Ordering::Relaxed) &&
236 self.transaction_mode().is_read_only()
237 {
238 let open_duration = self.start.elapsed();
239 if open_duration >= self.long_transaction_duration {
240 self.backtrace_recorded.store(true, Ordering::Relaxed);
241 #[cfg(debug_assertions)]
242 let message = format!(
243 "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
244 self.open_backtrace,
245 Backtrace::force_capture()
246 );
247 #[cfg(not(debug_assertions))]
248 let message = format!(
249 "The database read transaction has been open for too long. Backtrace:\n{}",
250 Backtrace::force_capture()
251 );
252 warn!(
253 target: "storage::db::mdbx",
254 ?open_duration,
255 %self.txn_id,
256 "{message}"
257 );
258 }
259 }
260 }
261}
262
263impl<K: TransactionKind> Drop for MetricsHandler<K> {
264 fn drop(&mut self) {
265 if !self.close_recorded {
266 self.log_backtrace_on_long_read_transaction();
267 self.env_metrics.record_closed_transaction(
268 self.transaction_mode(),
269 TransactionOutcome::Drop,
270 self.start.elapsed(),
271 None,
272 None,
273 );
274 }
275 }
276}
277
278impl TableImporter for Tx<RW> {}
279
280impl<K: TransactionKind> DbTx for Tx<K> {
281 type Cursor<T: Table> = Cursor<K, T>;
282 type DupCursor<T: DupSort> = Cursor<K, T>;
283
284 fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
285 self.get_by_encoded_key::<T>(&key.encode())
286 }
287
288 fn get_by_encoded_key<T: Table>(
289 &self,
290 key: &<T::Key as Encode>::Encoded,
291 ) -> Result<Option<T::Value>, DatabaseError> {
292 self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
293 tx.get(self.get_dbi::<T>()?, key.as_ref())
294 .map_err(|e| DatabaseError::Read(e.into()))?
295 .map(decode_one::<T>)
296 .transpose()
297 })
298 }
299
300 fn commit(self) -> Result<bool, DatabaseError> {
301 self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
302 match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
303 Ok((v, latency)) => (Ok(v), Some(latency)),
304 Err(e) => (Err(e), None),
305 }
306 })
307 }
308
309 fn abort(self) {
310 self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
311 (drop(this.inner), None)
312 })
313 }
314
315 fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
317 self.new_cursor()
318 }
319
320 fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
322 self.new_cursor()
323 }
324
325 fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
327 Ok(self
328 .inner
329 .db_stat_with_dbi(self.get_dbi::<T>()?)
330 .map_err(|e| DatabaseError::Stats(e.into()))?
331 .entries())
332 }
333
334 fn disable_long_read_transaction_safety(&mut self) {
337 if let Some(metrics_handler) = self.metrics_handler.as_mut() {
338 metrics_handler.record_backtrace = false;
339 }
340
341 self.inner.disable_timeout();
342 }
343}
344
345impl DbTxMut for Tx<RW> {
346 type CursorMut<T: Table> = Cursor<RW, T>;
347 type DupCursorMut<T: DupSort> = Cursor<RW, T>;
348
349 fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
350 let key = key.encode();
351 let value = value.compress();
352 self.execute_with_operation_metric::<T, _>(
353 Operation::Put,
354 Some(value.as_ref().len()),
355 |tx| {
356 tx.put(self.get_dbi::<T>()?, key.as_ref(), value, WriteFlags::UPSERT).map_err(|e| {
357 DatabaseWriteError {
358 info: e.into(),
359 operation: DatabaseWriteOperation::Put,
360 table_name: T::NAME,
361 key: key.into(),
362 }
363 .into()
364 })
365 },
366 )
367 }
368
369 fn delete<T: Table>(
370 &self,
371 key: T::Key,
372 value: Option<T::Value>,
373 ) -> Result<bool, DatabaseError> {
374 let mut data = None;
375
376 let value = value.map(Compress::compress);
377 if let Some(value) = &value {
378 data = Some(value.as_ref());
379 };
380
381 self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
382 tx.del(self.get_dbi::<T>()?, key.encode(), data)
383 .map_err(|e| DatabaseError::Delete(e.into()))
384 })
385 }
386
387 fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
388 self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
389
390 Ok(())
391 }
392
393 fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
394 self.new_cursor()
395 }
396
397 fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
398 self.new_cursor()
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
405 use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
406 use reth_libmdbx::MaxReadTransactionDuration;
407 use reth_storage_errors::db::DatabaseError;
408 use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
409 use tempfile::tempdir;
410
411 #[test]
412 fn long_read_transaction_safety_disabled() {
413 const MAX_DURATION: Duration = Duration::from_secs(1);
414
415 let dir = tempdir().unwrap();
416 let args = DatabaseArguments::new(ClientVersion::default())
417 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
418 MAX_DURATION,
419 )));
420 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
421
422 let mut tx = db.tx().unwrap();
423 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
424 tx.disable_long_read_transaction_safety();
425 sleep(MAX_DURATION + Duration::from_millis(100));
427
428 assert_eq!(
430 tx.get::<tables::Transactions>(0),
431 Err(DatabaseError::Open(reth_libmdbx::Error::NotFound.into()))
432 );
433 assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
435 }
436
437 #[test]
438 fn long_read_transaction_safety_enabled() {
439 const MAX_DURATION: Duration = Duration::from_secs(1);
440
441 let dir = tempdir().unwrap();
442 let args = DatabaseArguments::new(ClientVersion::default())
443 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
444 MAX_DURATION,
445 )));
446 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
447
448 let mut tx = db.tx().unwrap();
449 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
450 sleep(MAX_DURATION + Duration::from_millis(100));
452
453 assert_eq!(
455 tx.get::<tables::Transactions>(0),
456 Err(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
457 );
458 assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
460 }
461}