Skip to main content

reth_db_api/
database.rs

1use crate::{
2    table::TableImporter,
3    transaction::{DbTx, DbTxMut},
4    DatabaseError,
5};
6use std::{fmt::Debug, path::PathBuf, sync::Arc};
7
8/// Main Database trait that can open read-only and read-write transactions.
9///
10/// Sealed trait which cannot be implemented by 3rd parties, exposed only for consumption.
11pub trait Database: Send + Sync + Debug {
12    /// Read-Only database transaction
13    type TX: DbTx + Send + Sync + Debug + 'static;
14    /// Read-Write database transaction
15    type TXMut: DbTxMut + DbTx + TableImporter + Send + Sync + Debug + 'static;
16
17    /// Create read only transaction.
18    #[track_caller]
19    fn tx(&self) -> Result<Self::TX, DatabaseError>;
20
21    /// Create read write transaction only possible if database is open with write access.
22    #[track_caller]
23    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError>;
24
25    /// Returns the path to the database directory.
26    fn path(&self) -> PathBuf;
27
28    /// Returns the transaction ID of the oldest active reader, if available.
29    ///
30    /// This is the committed txnid of the snapshot the reader is pinned to, not a unique per-reader
31    /// identifier, so multiple readers can report the same txnid.
32    ///
33    /// Used to check whether stale readers from a previous write transaction have completed.
34    /// Returns `None` if no readers are active or the backend does not support this query.
35    fn oldest_reader_txnid(&self) -> Option<u64>;
36
37    /// Returns the ID of the most recently committed transaction, if available.
38    fn last_txnid(&self) -> Option<u64>;
39
40    /// Takes a function and passes a read-only transaction into it, making sure it's closed in the
41    /// end of the execution.
42    fn view<T, F>(&self, f: F) -> Result<T, DatabaseError>
43    where
44        F: FnOnce(&mut Self::TX) -> T,
45    {
46        let mut tx = self.tx()?;
47
48        let res = f(&mut tx);
49        tx.commit()?;
50
51        Ok(res)
52    }
53
54    /// Takes a function and passes a write-read transaction into it, making sure it's committed in
55    /// the end of the execution.
56    fn update<T, F>(&self, f: F) -> Result<T, DatabaseError>
57    where
58        F: FnOnce(&Self::TXMut) -> T,
59    {
60        let tx = self.tx_mut()?;
61
62        let res = f(&tx);
63        tx.commit()?;
64
65        Ok(res)
66    }
67}
68
69impl<DB: Database> Database for Arc<DB> {
70    type TX = <DB as Database>::TX;
71    type TXMut = <DB as Database>::TXMut;
72
73    fn tx(&self) -> Result<Self::TX, DatabaseError> {
74        <DB as Database>::tx(self)
75    }
76
77    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
78        <DB as Database>::tx_mut(self)
79    }
80
81    fn path(&self) -> PathBuf {
82        <DB as Database>::path(self)
83    }
84
85    fn oldest_reader_txnid(&self) -> Option<u64> {
86        <DB as Database>::oldest_reader_txnid(self)
87    }
88
89    fn last_txnid(&self) -> Option<u64> {
90        <DB as Database>::last_txnid(self)
91    }
92}
93
94impl<DB: Database> Database for &DB {
95    type TX = <DB as Database>::TX;
96    type TXMut = <DB as Database>::TXMut;
97
98    fn tx(&self) -> Result<Self::TX, DatabaseError> {
99        <DB as Database>::tx(self)
100    }
101
102    fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
103        <DB as Database>::tx_mut(self)
104    }
105
106    fn path(&self) -> PathBuf {
107        <DB as Database>::path(self)
108    }
109
110    fn oldest_reader_txnid(&self) -> Option<u64> {
111        <DB as Database>::oldest_reader_txnid(self)
112    }
113
114    fn last_txnid(&self) -> Option<u64> {
115        <DB as Database>::last_txnid(self)
116    }
117}
118
119/// Object-safe adapter for reader-txn tracking during unwind.
120pub trait ReaderTxnTracker: Send + Sync {
121    /// Waits until all readers older than the latest committed txnid have drained.
122    fn wait_for_pre_commit_readers(&self);
123}
124
125impl<DB: Database> ReaderTxnTracker for DB {
126    fn wait_for_pre_commit_readers(&self) {
127        if let Some(committed_txnid) = Database::last_txnid(self) {
128            while Database::oldest_reader_txnid(self).is_some_and(|oldest| oldest < committed_txnid)
129            {
130                std::thread::sleep(std::time::Duration::from_millis(10));
131            }
132        }
133    }
134}