reth_db/implementation/mdbx/
tx.rs1use super::{cursor::Cursor, utils::*};
4use crate::{
5 metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6 DatabaseError,
7};
8use reth_db_api::{
9 table::{Compress, DupSort, Encode, IntoVec, Table, TableImporter},
10 transaction::{DbTx, DbTxMut},
11};
12use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
13use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
14use reth_tracing::tracing::{debug, instrument, trace, warn};
15use std::{
16 backtrace::Backtrace,
17 collections::HashMap,
18 marker::PhantomData,
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22 },
23 time::{Duration, Instant},
24};
25
26const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32 inner: Transaction<K>,
34
35 dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
37
38 metrics_handler: Option<MetricsHandler<K>>,
43}
44
45impl<K: TransactionKind> Tx<K> {
46 #[inline]
48 #[track_caller]
49 pub(crate) fn new(
50 inner: Transaction<K>,
51 dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
52 env_metrics: Option<Arc<DatabaseEnvMetrics>>,
53 ) -> reth_libmdbx::Result<Self> {
54 let metrics_handler = env_metrics
55 .map(|env_metrics| {
56 let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
57 handler.env_metrics.record_opened_transaction(handler.transaction_mode());
58 handler.log_transaction_opened();
59 Ok(handler)
60 })
61 .transpose()?;
62 Ok(Self { inner, dbis, metrics_handler })
63 }
64
65 pub const fn inner(&self) -> &Transaction<K> {
67 &self.inner
68 }
69
70 pub fn id(&self) -> reth_libmdbx::Result<u64> {
72 self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
73 }
74
75 pub fn get_dbi_raw(&self, name: &str) -> Result<MDBX_dbi, DatabaseError> {
78 if let Some(dbi) = self.dbis.get(name) {
79 Ok(*dbi)
80 } else {
81 self.inner
82 .open_db(Some(name))
83 .map(|db| db.dbi())
84 .map_err(|e| DatabaseError::Open(e.into()))
85 }
86 }
87
88 pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
91 self.get_dbi_raw(T::NAME)
92 }
93
94 pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
96 let inner = self
97 .inner
98 .cursor_with_dbi(self.get_dbi::<T>()?)
99 .map_err(|e| DatabaseError::InitCursor(e.into()))?;
100
101 Ok(Cursor::new_with_metrics(
102 inner,
103 self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
104 ))
105 }
106
107 fn execute_with_close_transaction_metric<R>(
112 mut self,
113 outcome: TransactionOutcome,
114 f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
115 ) -> R {
116 let run = |tx| {
117 let start = Instant::now();
118 let (result, commit_latency) = f(tx);
119 let total_duration = start.elapsed();
120
121 if outcome.is_commit() {
122 debug!(
123 target: "storage::db::mdbx",
124 ?total_duration,
125 ?commit_latency,
126 is_read_only = K::IS_READ_ONLY,
127 "Commit"
128 );
129 }
130
131 (result, commit_latency, total_duration)
132 };
133
134 if let Some(mut metrics_handler) = self.metrics_handler.take() {
135 metrics_handler.close_recorded = true;
136 metrics_handler.log_backtrace_on_long_read_transaction();
137
138 let (result, commit_latency, close_duration) = run(self);
139 let open_duration = metrics_handler.start.elapsed();
140 metrics_handler.env_metrics.record_closed_transaction(
141 metrics_handler.transaction_mode(),
142 outcome,
143 open_duration,
144 Some(close_duration),
145 commit_latency,
146 );
147
148 result
149 } else {
150 run(self).0
151 }
152 }
153
154 fn execute_with_operation_metric<T: Table, R>(
159 &self,
160 operation: Operation,
161 value_size: Option<usize>,
162 f: impl FnOnce(&Transaction<K>) -> R,
163 ) -> R {
164 if let Some(metrics_handler) = &self.metrics_handler {
165 metrics_handler.log_backtrace_on_long_read_transaction();
166 metrics_handler
167 .env_metrics
168 .record_operation(T::NAME, operation, value_size, || f(&self.inner))
169 } else {
170 f(&self.inner)
171 }
172 }
173}
174
175#[derive(Debug)]
176struct MetricsHandler<K: TransactionKind> {
177 txn_id: u64,
179 start: Instant,
181 long_transaction_duration: Duration,
183 close_recorded: bool,
186 record_backtrace: bool,
189 backtrace_recorded: AtomicBool,
192 env_metrics: Arc<DatabaseEnvMetrics>,
194 #[cfg(debug_assertions)]
197 open_backtrace: Backtrace,
198 _marker: PhantomData<K>,
199}
200
201impl<K: TransactionKind> MetricsHandler<K> {
202 fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
203 Self {
204 txn_id,
205 start: Instant::now(),
206 long_transaction_duration: LONG_TRANSACTION_DURATION,
207 close_recorded: false,
208 record_backtrace: true,
209 backtrace_recorded: AtomicBool::new(false),
210 #[cfg(debug_assertions)]
211 open_backtrace: Backtrace::force_capture(),
212 env_metrics,
213 _marker: PhantomData,
214 }
215 }
216
217 const fn transaction_mode(&self) -> TransactionMode {
218 if K::IS_READ_ONLY {
219 TransactionMode::ReadOnly
220 } else {
221 TransactionMode::ReadWrite
222 }
223 }
224
225 #[track_caller]
227 fn log_transaction_opened(&self) {
228 trace!(
229 target: "storage::db::mdbx",
230 caller = %core::panic::Location::caller(),
231 id = %self.txn_id,
232 mode = %self.transaction_mode().as_str(),
233 "Transaction opened",
234 );
235 }
236
237 fn log_backtrace_on_long_read_transaction(&self) {
244 if self.record_backtrace &&
245 !self.backtrace_recorded.load(Ordering::Relaxed) &&
246 self.transaction_mode().is_read_only()
247 {
248 let open_duration = self.start.elapsed();
249 if open_duration >= self.long_transaction_duration {
250 self.backtrace_recorded.store(true, Ordering::Relaxed);
251 #[cfg(debug_assertions)]
252 let message = format!(
253 "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
254 self.open_backtrace,
255 Backtrace::force_capture()
256 );
257 #[cfg(not(debug_assertions))]
258 let message = format!(
259 "The database read transaction has been open for too long. Backtrace:\n{}",
260 Backtrace::force_capture()
261 );
262 warn!(
263 target: "storage::db::mdbx",
264 ?open_duration,
265 %self.txn_id,
266 "{message}"
267 );
268 }
269 }
270 }
271}
272
273impl<K: TransactionKind> Drop for MetricsHandler<K> {
274 fn drop(&mut self) {
275 if !self.close_recorded {
276 self.log_backtrace_on_long_read_transaction();
277 self.env_metrics.record_closed_transaction(
278 self.transaction_mode(),
279 TransactionOutcome::Drop,
280 self.start.elapsed(),
281 None,
282 None,
283 );
284 }
285 }
286}
287
288impl TableImporter for Tx<RW> {}
289
290impl<K: TransactionKind> DbTx for Tx<K> {
291 type Cursor<T: Table> = Cursor<K, T>;
292 type DupCursor<T: DupSort> = Cursor<K, T>;
293
294 fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
295 self.get_by_encoded_key::<T>(&key.encode())
296 }
297
298 fn get_by_encoded_key<T: Table>(
299 &self,
300 key: &<T::Key as Encode>::Encoded,
301 ) -> Result<Option<T::Value>, DatabaseError> {
302 self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
303 tx.get(self.get_dbi::<T>()?, key.as_ref())
304 .map_err(|e| DatabaseError::Read(e.into()))?
305 .map(decode_one::<T>)
306 .transpose()
307 })
308 }
309
310 #[instrument(name = "Tx::commit", level = "debug", target = "providers::db", skip_all)]
311 fn commit(self) -> Result<(), DatabaseError> {
312 self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
313 match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
314 Ok(latency) => (Ok(()), Some(latency)),
315 Err(e) => (Err(e), None),
316 }
317 })
318 }
319
320 fn abort(self) {
321 self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
322 (drop(this.inner), None)
323 })
324 }
325
326 fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
328 self.new_cursor()
329 }
330
331 fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
333 self.new_cursor()
334 }
335
336 fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
338 Ok(self
339 .inner
340 .db_stat_with_dbi(self.get_dbi::<T>()?)
341 .map_err(|e| DatabaseError::Stats(e.into()))?
342 .entries())
343 }
344
345 fn disable_long_read_transaction_safety(&mut self) {
348 if let Some(metrics_handler) = self.metrics_handler.as_mut() {
349 metrics_handler.record_backtrace = false;
350 }
351
352 self.inner.disable_timeout();
353 }
354}
355
356#[derive(Clone, Copy)]
357enum PutKind {
358 Upsert,
360 Append,
363}
364
365impl PutKind {
366 const fn into_operation_and_flags(self) -> (Operation, DatabaseWriteOperation, WriteFlags) {
367 match self {
368 Self::Upsert => {
369 (Operation::PutUpsert, DatabaseWriteOperation::PutUpsert, WriteFlags::UPSERT)
370 }
371 Self::Append => {
372 (Operation::PutAppend, DatabaseWriteOperation::PutAppend, WriteFlags::APPEND)
373 }
374 }
375 }
376}
377
378impl Tx<RW> {
379 fn put<T: Table>(
382 &self,
383 kind: PutKind,
384 key: T::Key,
385 value: T::Value,
386 ) -> Result<(), DatabaseError> {
387 let key = key.encode();
388 let value = value.compress();
389 let (operation, write_operation, flags) = kind.into_operation_and_flags();
390 self.execute_with_operation_metric::<T, _>(operation, Some(value.as_ref().len()), |tx| {
391 tx.put(self.get_dbi::<T>()?, key.as_ref(), value, flags).map_err(|e| {
392 DatabaseWriteError {
393 info: e.into(),
394 operation: write_operation,
395 table_name: T::NAME,
396 key: key.into_vec(),
397 }
398 .into()
399 })
400 })
401 }
402}
403
404impl DbTxMut for Tx<RW> {
405 type CursorMut<T: Table> = Cursor<RW, T>;
406 type DupCursorMut<T: DupSort> = Cursor<RW, T>;
407
408 fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
409 self.put::<T>(PutKind::Upsert, key, value)
410 }
411
412 fn append<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
413 self.put::<T>(PutKind::Append, key, value)
414 }
415
416 fn delete<T: Table>(
417 &self,
418 key: T::Key,
419 value: Option<T::Value>,
420 ) -> Result<bool, DatabaseError> {
421 let mut data = None;
422
423 let value = value.map(Compress::compress);
424 if let Some(value) = &value {
425 data = Some(value.as_ref());
426 };
427
428 self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
429 tx.del(self.get_dbi::<T>()?, key.encode(), data)
430 .map_err(|e| DatabaseError::Delete(e.into()))
431 })
432 }
433
434 fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
435 self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
436
437 Ok(())
438 }
439
440 fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
441 self.new_cursor()
442 }
443
444 fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
445 self.new_cursor()
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
452 use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
453 use reth_libmdbx::MaxReadTransactionDuration;
454 use reth_storage_errors::db::DatabaseError;
455 use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
456 use tempfile::tempdir;
457
458 #[test]
459 fn long_read_transaction_safety_disabled() {
460 const MAX_DURATION: Duration = Duration::from_secs(1);
461
462 let dir = tempdir().unwrap();
463 let args = DatabaseArguments::new(ClientVersion::default())
464 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
465 MAX_DURATION,
466 )));
467 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
468
469 let mut tx = db.tx().unwrap();
470 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
471 tx.disable_long_read_transaction_safety();
472 sleep(MAX_DURATION + Duration::from_millis(100));
474
475 assert!(matches!(
477 tx.get::<tables::Transactions>(0).unwrap_err(),
478 DatabaseError::Open(err) if err == reth_libmdbx::Error::NotFound.into()));
479 assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
481 }
482
483 #[test]
484 fn long_read_transaction_safety_enabled() {
485 const MAX_DURATION: Duration = Duration::from_secs(1);
486
487 let dir = tempdir().unwrap();
488 let args = DatabaseArguments::new(ClientVersion::default())
489 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
490 MAX_DURATION,
491 )));
492 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
493
494 let mut tx = db.tx().unwrap();
495 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
496 sleep(MAX_DURATION + Duration::from_millis(100));
498
499 assert!(matches!(
501 tx.get::<tables::Transactions>(0).unwrap_err(),
502 DatabaseError::Open(err) if err == reth_libmdbx::Error::ReadTransactionTimeout.into()));
503 assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
505 }
506}