reth_db/implementation/mdbx/
tx.rs1use super::{cursor::Cursor, utils::*};
4use crate::{
5 metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6 DatabaseError,
7};
8use reth_db_api::{
9 table::{Compress, DupSort, Encode, IntoVec, Table, TableImporter},
10 transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, trace, warn};
15use std::{
16 backtrace::Backtrace,
17 collections::HashMap,
18 marker::PhantomData,
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22 },
23 time::{Duration, Instant},
24};
25
26const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32 pub inner: Transaction<K>,
34
35 dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
37
38 metrics_handler: Option<MetricsHandler<K>>,
43}
44
45impl<K: TransactionKind> Tx<K> {
46 #[inline]
48 #[track_caller]
49 pub(crate) fn new(
50 inner: Transaction<K>,
51 dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
52 env_metrics: Option<Arc<DatabaseEnvMetrics>>,
53 ) -> reth_libmdbx::Result<Self> {
54 let metrics_handler = env_metrics
55 .map(|env_metrics| {
56 let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
57 handler.env_metrics.record_opened_transaction(handler.transaction_mode());
58 handler.log_transaction_opened();
59 Ok(handler)
60 })
61 .transpose()?;
62 Ok(Self { inner, dbis, metrics_handler })
63 }
64
65 pub fn id(&self) -> reth_libmdbx::Result<u64> {
67 self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
68 }
69
70 pub fn get_dbi_raw(&self, name: &str) -> Result<MDBX_dbi, DatabaseError> {
73 if let Some(dbi) = self.dbis.get(name) {
74 Ok(*dbi)
75 } else {
76 self.inner
77 .open_db(Some(name))
78 .map(|db| db.dbi())
79 .map_err(|e| DatabaseError::Open(e.into()))
80 }
81 }
82
83 pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
86 self.get_dbi_raw(T::NAME)
87 }
88
89 pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
91 let inner = self
92 .inner
93 .cursor_with_dbi(self.get_dbi::<T>()?)
94 .map_err(|e| DatabaseError::InitCursor(e.into()))?;
95
96 Ok(Cursor::new_with_metrics(
97 inner,
98 self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
99 ))
100 }
101
102 fn execute_with_close_transaction_metric<R>(
107 mut self,
108 outcome: TransactionOutcome,
109 f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
110 ) -> R {
111 let run = |tx| {
112 let start = Instant::now();
113 let (result, commit_latency) = f(tx);
114 let total_duration = start.elapsed();
115
116 if outcome.is_commit() {
117 debug!(
118 target: "storage::db::mdbx",
119 ?total_duration,
120 ?commit_latency,
121 is_read_only = K::IS_READ_ONLY,
122 "Commit"
123 );
124 }
125
126 (result, commit_latency, total_duration)
127 };
128
129 if let Some(mut metrics_handler) = self.metrics_handler.take() {
130 metrics_handler.close_recorded = true;
131 metrics_handler.log_backtrace_on_long_read_transaction();
132
133 let (result, commit_latency, close_duration) = run(self);
134 let open_duration = metrics_handler.start.elapsed();
135 metrics_handler.env_metrics.record_closed_transaction(
136 metrics_handler.transaction_mode(),
137 outcome,
138 open_duration,
139 Some(close_duration),
140 commit_latency,
141 );
142
143 result
144 } else {
145 run(self).0
146 }
147 }
148
149 fn execute_with_operation_metric<T: Table, R>(
154 &self,
155 operation: Operation,
156 value_size: Option<usize>,
157 f: impl FnOnce(&Transaction<K>) -> R,
158 ) -> R {
159 if let Some(metrics_handler) = &self.metrics_handler {
160 metrics_handler.log_backtrace_on_long_read_transaction();
161 metrics_handler
162 .env_metrics
163 .record_operation(T::NAME, operation, value_size, || f(&self.inner))
164 } else {
165 f(&self.inner)
166 }
167 }
168}
169
170#[derive(Debug)]
171struct MetricsHandler<K: TransactionKind> {
172 txn_id: u64,
174 start: Instant,
176 long_transaction_duration: Duration,
178 close_recorded: bool,
181 record_backtrace: bool,
184 backtrace_recorded: AtomicBool,
187 env_metrics: Arc<DatabaseEnvMetrics>,
189 #[cfg(debug_assertions)]
192 open_backtrace: Backtrace,
193 _marker: PhantomData<K>,
194}
195
196impl<K: TransactionKind> MetricsHandler<K> {
197 fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
198 Self {
199 txn_id,
200 start: Instant::now(),
201 long_transaction_duration: LONG_TRANSACTION_DURATION,
202 close_recorded: false,
203 record_backtrace: true,
204 backtrace_recorded: AtomicBool::new(false),
205 #[cfg(debug_assertions)]
206 open_backtrace: Backtrace::force_capture(),
207 env_metrics,
208 _marker: PhantomData,
209 }
210 }
211
212 const fn transaction_mode(&self) -> TransactionMode {
213 if K::IS_READ_ONLY {
214 TransactionMode::ReadOnly
215 } else {
216 TransactionMode::ReadWrite
217 }
218 }
219
220 #[track_caller]
222 fn log_transaction_opened(&self) {
223 trace!(
224 target: "storage::db::mdbx",
225 caller = %core::panic::Location::caller(),
226 id = %self.txn_id,
227 mode = %self.transaction_mode().as_str(),
228 "Transaction opened",
229 );
230 }
231
232 fn log_backtrace_on_long_read_transaction(&self) {
239 if self.record_backtrace &&
240 !self.backtrace_recorded.load(Ordering::Relaxed) &&
241 self.transaction_mode().is_read_only()
242 {
243 let open_duration = self.start.elapsed();
244 if open_duration >= self.long_transaction_duration {
245 self.backtrace_recorded.store(true, Ordering::Relaxed);
246 #[cfg(debug_assertions)]
247 let message = format!(
248 "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
249 self.open_backtrace,
250 Backtrace::force_capture()
251 );
252 #[cfg(not(debug_assertions))]
253 let message = format!(
254 "The database read transaction has been open for too long. Backtrace:\n{}",
255 Backtrace::force_capture()
256 );
257 warn!(
258 target: "storage::db::mdbx",
259 ?open_duration,
260 %self.txn_id,
261 "{message}"
262 );
263 }
264 }
265 }
266}
267
268impl<K: TransactionKind> Drop for MetricsHandler<K> {
269 fn drop(&mut self) {
270 if !self.close_recorded {
271 self.log_backtrace_on_long_read_transaction();
272 self.env_metrics.record_closed_transaction(
273 self.transaction_mode(),
274 TransactionOutcome::Drop,
275 self.start.elapsed(),
276 None,
277 None,
278 );
279 }
280 }
281}
282
283impl TableImporter for Tx<RW> {}
284
285impl<K: TransactionKind> DbTx for Tx<K> {
286 type Cursor<T: Table> = Cursor<K, T>;
287 type DupCursor<T: DupSort> = Cursor<K, T>;
288
289 fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
290 self.get_by_encoded_key::<T>(&key.encode())
291 }
292
293 fn get_by_encoded_key<T: Table>(
294 &self,
295 key: &<T::Key as Encode>::Encoded,
296 ) -> Result<Option<T::Value>, DatabaseError> {
297 self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
298 tx.get(self.get_dbi::<T>()?, key.as_ref())
299 .map_err(|e| DatabaseError::Read(e.into()))?
300 .map(decode_one::<T>)
301 .transpose()
302 })
303 }
304
305 fn commit(self) -> Result<(), DatabaseError> {
306 self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
307 match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
308 Ok(latency) => (Ok(()), Some(latency)),
309 Err(e) => (Err(e), None),
310 }
311 })
312 }
313
314 fn abort(self) {
315 self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
316 (drop(this.inner), None)
317 })
318 }
319
320 fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
322 self.new_cursor()
323 }
324
325 fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
327 self.new_cursor()
328 }
329
330 fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
332 Ok(self
333 .inner
334 .db_stat_with_dbi(self.get_dbi::<T>()?)
335 .map_err(|e| DatabaseError::Stats(e.into()))?
336 .entries())
337 }
338
339 fn disable_long_read_transaction_safety(&mut self) {
342 if let Some(metrics_handler) = self.metrics_handler.as_mut() {
343 metrics_handler.record_backtrace = false;
344 }
345
346 self.inner.disable_timeout();
347 }
348}
349
350#[derive(Clone, Copy)]
351enum PutKind {
352 Upsert,
354 Append,
357}
358
359impl PutKind {
360 const fn into_operation_and_flags(self) -> (Operation, DatabaseWriteOperation, WriteFlags) {
361 match self {
362 Self::Upsert => {
363 (Operation::PutUpsert, DatabaseWriteOperation::PutUpsert, WriteFlags::UPSERT)
364 }
365 Self::Append => {
366 (Operation::PutAppend, DatabaseWriteOperation::PutAppend, WriteFlags::APPEND)
367 }
368 }
369 }
370}
371
372impl Tx<RW> {
373 fn put<T: Table>(
376 &self,
377 kind: PutKind,
378 key: T::Key,
379 value: T::Value,
380 ) -> Result<(), DatabaseError> {
381 let key = key.encode();
382 let value = value.compress();
383 let (operation, write_operation, flags) = kind.into_operation_and_flags();
384 self.execute_with_operation_metric::<T, _>(operation, Some(value.as_ref().len()), |tx| {
385 tx.put(self.get_dbi::<T>()?, key.as_ref(), value, flags).map_err(|e| {
386 DatabaseWriteError {
387 info: e.into(),
388 operation: write_operation,
389 table_name: T::NAME,
390 key: key.into_vec(),
391 }
392 .into()
393 })
394 })
395 }
396}
397
398impl DbTxMut for Tx<RW> {
399 type CursorMut<T: Table> = Cursor<RW, T>;
400 type DupCursorMut<T: DupSort> = Cursor<RW, T>;
401
402 fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
403 self.put::<T>(PutKind::Upsert, key, value)
404 }
405
406 fn append<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
407 self.put::<T>(PutKind::Append, key, value)
408 }
409
410 fn delete<T: Table>(
411 &self,
412 key: T::Key,
413 value: Option<T::Value>,
414 ) -> Result<bool, DatabaseError> {
415 let mut data = None;
416
417 let value = value.map(Compress::compress);
418 if let Some(value) = &value {
419 data = Some(value.as_ref());
420 };
421
422 self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
423 tx.del(self.get_dbi::<T>()?, key.encode(), data)
424 .map_err(|e| DatabaseError::Delete(e.into()))
425 })
426 }
427
428 fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
429 self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
430
431 Ok(())
432 }
433
434 fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
435 self.new_cursor()
436 }
437
438 fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
439 self.new_cursor()
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
446 use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
447 use reth_libmdbx::MaxReadTransactionDuration;
448 use reth_storage_errors::db::DatabaseError;
449 use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
450 use tempfile::tempdir;
451
452 #[test]
453 fn long_read_transaction_safety_disabled() {
454 const MAX_DURATION: Duration = Duration::from_secs(1);
455
456 let dir = tempdir().unwrap();
457 let args = DatabaseArguments::new(ClientVersion::default())
458 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
459 MAX_DURATION,
460 )));
461 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
462
463 let mut tx = db.tx().unwrap();
464 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
465 tx.disable_long_read_transaction_safety();
466 sleep(MAX_DURATION + Duration::from_millis(100));
468
469 assert!(matches!(
471 tx.get::<tables::Transactions>(0).unwrap_err(),
472 DatabaseError::Open(err) if err == reth_libmdbx::Error::NotFound.into()));
473 assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
475 }
476
477 #[test]
478 fn long_read_transaction_safety_enabled() {
479 const MAX_DURATION: Duration = Duration::from_secs(1);
480
481 let dir = tempdir().unwrap();
482 let args = DatabaseArguments::new(ClientVersion::default())
483 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
484 MAX_DURATION,
485 )));
486 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
487
488 let mut tx = db.tx().unwrap();
489 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
490 sleep(MAX_DURATION + Duration::from_millis(100));
492
493 assert!(matches!(
495 tx.get::<tables::Transactions>(0).unwrap_err(),
496 DatabaseError::Open(err) if err == reth_libmdbx::Error::ReadTransactionTimeout.into()));
497 assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
499 }
500}