Skip to main content

reth_libmdbx/
txn_pool.rs

1use crate::error::mdbx_result;
2use crossbeam_queue::ArrayQueue;
3
4/// Lock-free pool of reset read-only MDBX transaction handles.
5///
6/// With `MDBX_NOTLS` (which reth always sets), every `mdbx_txn_begin_ex` for a read transaction
7/// calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high
8/// concurrency (e.g., prewarming), this becomes a contention point.
9///
10/// This pool caches transaction handles that have been reset via `mdbx_txn_reset`. A reset handle
11/// retains its reader slot, so `mdbx_txn_renew` can reactivate it without touching the reader
12/// table mutex.
13pub(crate) struct ReadTxnPool {
14    queue: ArrayQueue<PooledTxn>,
15}
16
17/// Wrapper around a raw txn pointer to satisfy `Send + Sync` for the queue.
18struct PooledTxn(*mut ffi::MDBX_txn);
19
20// SAFETY: MDBX txn pointers are safe to send across threads — we ensure exclusive
21// ownership via the queue's push/pop semantics.
22unsafe impl Send for PooledTxn {}
23unsafe impl Sync for PooledTxn {}
24
25impl ReadTxnPool {
26    pub(crate) fn new() -> Self {
27        Self { queue: ArrayQueue::new(256) }
28    }
29
30    /// Takes a reset transaction handle from the pool, renews it, and returns it ready for use.
31    ///
32    /// Returns `None` if the pool is empty or all renew attempts fail.
33    pub(crate) fn pop(&self) -> Option<*mut ffi::MDBX_txn> {
34        while let Some(handle) = self.queue.pop() {
35            let txn = handle.0;
36            // SAFETY: this pointer was previously created by mdbx_txn_begin_ex and reset
37            // via mdbx_txn_reset. mdbx_txn_renew reuses the existing reader slot without
38            // taking lck_rdt_lock.
39            match mdbx_result(unsafe { ffi::mdbx_txn_renew(txn) }) {
40                Ok(_) => return Some(txn),
41                Err(e) => {
42                    tracing::warn!(target: "libmdbx", %e, "failed to renew pooled read transaction");
43                    abort_txn(txn);
44                }
45            }
46        }
47        None
48    }
49
50    /// Resets an active read transaction handle and returns it to the pool.
51    ///
52    /// If reset fails, the handle is aborted instead.
53    pub(crate) fn push(&self, txn: *mut ffi::MDBX_txn) {
54        // mdbx_txn_reset releases the MVCC snapshot but keeps the reader slot.
55        if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn) }) {
56            tracing::warn!(target: "libmdbx", %e, "failed to reset read transaction for pooling");
57            abort_txn(txn);
58            return;
59        }
60
61        if self.queue.push(PooledTxn(txn)).is_err() {
62            abort_txn(txn);
63        }
64    }
65
66    /// Aborts all pooled transaction handles. Called during environment shutdown.
67    pub(crate) fn drain(&self) {
68        while let Some(handle) = self.queue.pop() {
69            abort_txn(handle.0);
70        }
71    }
72}
73
74/// Aborts a transaction handle, logging any error.
75fn abort_txn(txn: *mut ffi::MDBX_txn) {
76    if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_abort(txn) }) {
77        tracing::error!(target: "libmdbx", %e, "failed to abort transaction");
78    }
79}
80
81impl Drop for ReadTxnPool {
82    fn drop(&mut self) {
83        self.drain();
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use crate::{Environment, WriteFlags};
90
91    /// Opens a fresh test environment.
92    fn test_env() -> (tempfile::TempDir, Environment) {
93        let dir = tempfile::tempdir().unwrap();
94        let env = Environment::builder().open(dir.path()).unwrap();
95        (dir, env)
96    }
97
98    /// Inserts a single key so the database isn't empty.
99    fn seed(env: &Environment) {
100        let tx = env.begin_rw_txn().unwrap();
101        let db = tx.open_db(None).unwrap();
102        tx.put(db.dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
103        tx.commit().unwrap();
104    }
105
106    #[test]
107    fn get_returns_none_when_empty() {
108        let (_dir, env) = test_env();
109        assert!(env.ro_txn_pool().pop().is_none());
110    }
111
112    #[test]
113    fn put_get_roundtrip() {
114        let (_dir, env) = test_env();
115        seed(&env);
116
117        // Open and drop a read txn — drop returns the handle to the pool.
118        let txn = env.begin_ro_txn().unwrap();
119        drop(txn);
120
121        // Next begin_ro_txn should reuse the pooled handle.
122        let txn = env.begin_ro_txn().unwrap();
123        let _id = txn.id().unwrap();
124    }
125
126    #[test]
127    fn pooled_txn_reads_latest_snapshot() {
128        let (_dir, env) = test_env();
129        seed(&env);
130
131        // Open a read txn and drop it to pool the handle.
132        let txn = env.begin_ro_txn().unwrap();
133        drop(txn);
134
135        // Write new data.
136        {
137            let tx = env.begin_rw_txn().unwrap();
138            let db = tx.open_db(None).unwrap();
139            tx.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
140            tx.commit().unwrap();
141        }
142
143        // The renewed pooled txn must see the new data.
144        let txn = env.begin_ro_txn().unwrap();
145        let db = txn.open_db(None).unwrap();
146        let val: Option<[u8; 4]> = txn.get(db.dbi(), b"key2").unwrap();
147        assert_eq!(val, Some(*b"val2"));
148    }
149
150    #[test]
151    fn multiple_put_get_cycles() {
152        let (_dir, env) = test_env();
153        seed(&env);
154
155        for _ in 0..50 {
156            let txn = env.begin_ro_txn().unwrap();
157            let db = txn.open_db(None).unwrap();
158            let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
159            assert_eq!(val, Some(*b"val"));
160            drop(txn);
161        }
162    }
163
164    #[test]
165    fn concurrent_txns_pool_multiple_handles() {
166        let (_dir, env) = test_env();
167        seed(&env);
168
169        // Open several txns concurrently — each gets a fresh handle.
170        let txns: Vec<_> = (0..8).map(|_| env.begin_ro_txn().unwrap()).collect();
171
172        // Drop them all — pool should accumulate handles.
173        let count = txns.len();
174        drop(txns);
175
176        // Reopen same number — all should come from the pool.
177        let txns: Vec<_> = (0..count).map(|_| env.begin_ro_txn().unwrap()).collect();
178        for txn in &txns {
179            let db = txn.open_db(None).unwrap();
180            let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
181            assert_eq!(val, Some(*b"val"));
182        }
183    }
184
185    #[test]
186    fn drain_empties_pool() {
187        let (_dir, env) = test_env();
188        seed(&env);
189
190        // Pool a handle.
191        let txn = env.begin_ro_txn().unwrap();
192        drop(txn);
193
194        env.ro_txn_pool().drain();
195
196        // Pool is empty — get should return None.
197        assert!(env.ro_txn_pool().pop().is_none());
198    }
199
200    #[test]
201    fn committed_txn_is_not_pooled() {
202        let (_dir, env) = test_env();
203        seed(&env);
204
205        let txn = env.begin_ro_txn().unwrap();
206        txn.commit().unwrap();
207
208        // Committed txns are freed by mdbx, not returned to pool.
209        assert!(env.ro_txn_pool().pop().is_none());
210    }
211
212    #[test]
213    fn multithreaded_pool_usage() {
214        let (_dir, env) = test_env();
215        seed(&env);
216
217        let env = std::sync::Arc::new(env);
218        let barrier = std::sync::Arc::new(std::sync::Barrier::new(8));
219
220        let handles: Vec<_> = (0..8)
221            .map(|_| {
222                let env = env.clone();
223                let barrier = barrier.clone();
224                std::thread::spawn(move || {
225                    barrier.wait();
226                    for _ in 0..100 {
227                        let txn = env.begin_ro_txn().unwrap();
228                        let db = txn.open_db(None).unwrap();
229                        let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
230                        assert_eq!(val, Some(*b"val"));
231                        drop(txn);
232                    }
233                })
234            })
235            .collect();
236
237        for h in handles {
238            h.join().unwrap();
239        }
240    }
241
242    #[test]
243    fn multithreaded_mixed_read_write() {
244        let (_dir, env) = test_env();
245        seed(&env);
246
247        let env = std::sync::Arc::new(env);
248        let barrier = std::sync::Arc::new(std::sync::Barrier::new(5));
249
250        // Spawn reader threads.
251        let mut handles: Vec<_> = (0..4)
252            .map(|_| {
253                let env = env.clone();
254                let barrier = barrier.clone();
255                std::thread::spawn(move || {
256                    barrier.wait();
257                    for _ in 0..50 {
258                        let txn = env.begin_ro_txn().unwrap();
259                        let db = txn.open_db(None).unwrap();
260                        // key may or may not exist depending on writer timing.
261                        let _val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
262                        drop(txn);
263                    }
264                })
265            })
266            .collect();
267
268        // Spawn a writer thread.
269        handles.push(std::thread::spawn(move || {
270            barrier.wait();
271            for i in 0u32..20 {
272                let tx = env.begin_rw_txn().unwrap();
273                let db = tx.open_db(None).unwrap();
274                tx.put(db.dbi(), i.to_le_bytes(), b"v", WriteFlags::empty()).unwrap();
275                tx.commit().unwrap();
276            }
277        }));
278
279        for h in handles {
280            h.join().unwrap();
281        }
282    }
283
284    #[test]
285    fn multithreaded_concurrent_open_close() {
286        let (_dir, env) = test_env();
287        seed(&env);
288
289        let env = std::sync::Arc::new(env);
290        let barrier = std::sync::Arc::new(std::sync::Barrier::new(16));
291
292        // 16 threads each open and close 200 txns — exercises pool contention.
293        let handles: Vec<_> = (0..16)
294            .map(|_| {
295                let env = env.clone();
296                let barrier = barrier.clone();
297                std::thread::spawn(move || {
298                    barrier.wait();
299                    for _ in 0..200 {
300                        let txn = env.begin_ro_txn().unwrap();
301                        let db = txn.open_db(None).unwrap();
302                        let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
303                        assert_eq!(val, Some(*b"val"));
304                        // Intentionally don't call drop explicitly — let scope handle it.
305                    }
306                })
307            })
308            .collect();
309
310        for h in handles {
311            h.join().unwrap();
312        }
313    }
314
315    #[test]
316    fn multithreaded_hold_multiple_txns() {
317        let (_dir, env) = test_env();
318        seed(&env);
319
320        let env = std::sync::Arc::new(env);
321        let barrier = std::sync::Arc::new(std::sync::Barrier::new(8));
322
323        // Each thread holds multiple txns open simultaneously, then drops them all.
324        let handles: Vec<_> = (0..8)
325            .map(|_| {
326                let env = env.clone();
327                let barrier = barrier.clone();
328                std::thread::spawn(move || {
329                    barrier.wait();
330                    for _ in 0..20 {
331                        let txns: Vec<_> = (0..4).map(|_| env.begin_ro_txn().unwrap()).collect();
332                        for txn in &txns {
333                            let db = txn.open_db(None).unwrap();
334                            let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
335                            assert_eq!(val, Some(*b"val"));
336                        }
337                        drop(txns);
338                    }
339                })
340            })
341            .collect();
342
343        for h in handles {
344            h.join().unwrap();
345        }
346    }
347
348    #[test]
349    fn multithreaded_drain_under_contention() {
350        let (_dir, env) = test_env();
351        seed(&env);
352
353        let env = std::sync::Arc::new(env);
354
355        // Fill the pool with handles.
356        {
357            let txns: Vec<_> = (0..16).map(|_| env.begin_ro_txn().unwrap()).collect();
358            drop(txns);
359        }
360
361        let barrier = std::sync::Arc::new(std::sync::Barrier::new(5));
362
363        // 4 threads racing to get from pool + 1 thread draining.
364        let mut handles: Vec<_> = (0..4)
365            .map(|_| {
366                let env = env.clone();
367                let barrier = barrier.clone();
368                std::thread::spawn(move || {
369                    barrier.wait();
370                    for _ in 0..50 {
371                        let _txn = env.begin_ro_txn(); // may or may not get a pooled handle
372                    }
373                })
374            })
375            .collect();
376
377        handles.push(std::thread::spawn(move || {
378            barrier.wait();
379            env.ro_txn_pool().drain();
380        }));
381
382        for h in handles {
383            h.join().unwrap();
384        }
385    }
386
387    #[test]
388    fn pool_overflow_aborts_excess() {
389        let dir = tempfile::tempdir().unwrap();
390        let env = Environment::builder().set_max_readers(512).open(dir.path()).unwrap();
391        seed(&env);
392
393        // Open more txns than the pool capacity (256), drop them all.
394        let txns: Vec<_> = (0..300).map(|_| env.begin_ro_txn().unwrap()).collect();
395        drop(txns);
396
397        // Pool is capped at 256; excess handles are aborted.
398        assert_eq!(env.ro_txn_pool().queue.len(), 256);
399
400        // All 256 pooled handles should still work.
401        let txns: Vec<_> = (0..256).map(|_| env.begin_ro_txn().unwrap()).collect();
402        for txn in &txns {
403            let db = txn.open_db(None).unwrap();
404            let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
405            assert_eq!(val, Some(*b"val"));
406        }
407    }
408}