1use crate::error::mdbx_result;
2use crossbeam_queue::ArrayQueue;
3
4pub(crate) struct ReadTxnPool {
14 queue: ArrayQueue<PooledTxn>,
15}
16
17struct PooledTxn(*mut ffi::MDBX_txn);
19
20unsafe impl Send for PooledTxn {}
23unsafe impl Sync for PooledTxn {}
24
25impl ReadTxnPool {
26 pub(crate) fn new() -> Self {
27 Self { queue: ArrayQueue::new(256) }
28 }
29
30 pub(crate) fn pop(&self) -> Option<*mut ffi::MDBX_txn> {
34 while let Some(handle) = self.queue.pop() {
35 let txn = handle.0;
36 match mdbx_result(unsafe { ffi::mdbx_txn_renew(txn) }) {
40 Ok(_) => return Some(txn),
41 Err(e) => {
42 tracing::warn!(target: "libmdbx", %e, "failed to renew pooled read transaction");
43 abort_txn(txn);
44 }
45 }
46 }
47 None
48 }
49
50 pub(crate) fn push(&self, txn: *mut ffi::MDBX_txn) {
54 if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn) }) {
56 tracing::warn!(target: "libmdbx", %e, "failed to reset read transaction for pooling");
57 abort_txn(txn);
58 return;
59 }
60
61 if self.queue.push(PooledTxn(txn)).is_err() {
62 abort_txn(txn);
63 }
64 }
65
66 pub(crate) fn drain(&self) {
68 while let Some(handle) = self.queue.pop() {
69 abort_txn(handle.0);
70 }
71 }
72}
73
74fn abort_txn(txn: *mut ffi::MDBX_txn) {
76 if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_abort(txn) }) {
77 tracing::error!(target: "libmdbx", %e, "failed to abort transaction");
78 }
79}
80
81impl Drop for ReadTxnPool {
82 fn drop(&mut self) {
83 self.drain();
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use crate::{Environment, WriteFlags};
90
91 fn test_env() -> (tempfile::TempDir, Environment) {
93 let dir = tempfile::tempdir().unwrap();
94 let env = Environment::builder().open(dir.path()).unwrap();
95 (dir, env)
96 }
97
98 fn seed(env: &Environment) {
100 let tx = env.begin_rw_txn().unwrap();
101 let db = tx.open_db(None).unwrap();
102 tx.put(db.dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
103 tx.commit().unwrap();
104 }
105
106 #[test]
107 fn get_returns_none_when_empty() {
108 let (_dir, env) = test_env();
109 assert!(env.ro_txn_pool().pop().is_none());
110 }
111
112 #[test]
113 fn put_get_roundtrip() {
114 let (_dir, env) = test_env();
115 seed(&env);
116
117 let txn = env.begin_ro_txn().unwrap();
119 drop(txn);
120
121 let txn = env.begin_ro_txn().unwrap();
123 let _id = txn.id().unwrap();
124 }
125
126 #[test]
127 fn pooled_txn_reads_latest_snapshot() {
128 let (_dir, env) = test_env();
129 seed(&env);
130
131 let txn = env.begin_ro_txn().unwrap();
133 drop(txn);
134
135 {
137 let tx = env.begin_rw_txn().unwrap();
138 let db = tx.open_db(None).unwrap();
139 tx.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
140 tx.commit().unwrap();
141 }
142
143 let txn = env.begin_ro_txn().unwrap();
145 let db = txn.open_db(None).unwrap();
146 let val: Option<[u8; 4]> = txn.get(db.dbi(), b"key2").unwrap();
147 assert_eq!(val, Some(*b"val2"));
148 }
149
150 #[test]
151 fn multiple_put_get_cycles() {
152 let (_dir, env) = test_env();
153 seed(&env);
154
155 for _ in 0..50 {
156 let txn = env.begin_ro_txn().unwrap();
157 let db = txn.open_db(None).unwrap();
158 let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
159 assert_eq!(val, Some(*b"val"));
160 drop(txn);
161 }
162 }
163
164 #[test]
165 fn concurrent_txns_pool_multiple_handles() {
166 let (_dir, env) = test_env();
167 seed(&env);
168
169 let txns: Vec<_> = (0..8).map(|_| env.begin_ro_txn().unwrap()).collect();
171
172 let count = txns.len();
174 drop(txns);
175
176 let txns: Vec<_> = (0..count).map(|_| env.begin_ro_txn().unwrap()).collect();
178 for txn in &txns {
179 let db = txn.open_db(None).unwrap();
180 let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
181 assert_eq!(val, Some(*b"val"));
182 }
183 }
184
185 #[test]
186 fn drain_empties_pool() {
187 let (_dir, env) = test_env();
188 seed(&env);
189
190 let txn = env.begin_ro_txn().unwrap();
192 drop(txn);
193
194 env.ro_txn_pool().drain();
195
196 assert!(env.ro_txn_pool().pop().is_none());
198 }
199
200 #[test]
201 fn committed_txn_is_not_pooled() {
202 let (_dir, env) = test_env();
203 seed(&env);
204
205 let txn = env.begin_ro_txn().unwrap();
206 txn.commit().unwrap();
207
208 assert!(env.ro_txn_pool().pop().is_none());
210 }
211
212 #[test]
213 fn multithreaded_pool_usage() {
214 let (_dir, env) = test_env();
215 seed(&env);
216
217 let env = std::sync::Arc::new(env);
218 let barrier = std::sync::Arc::new(std::sync::Barrier::new(8));
219
220 let handles: Vec<_> = (0..8)
221 .map(|_| {
222 let env = env.clone();
223 let barrier = barrier.clone();
224 std::thread::spawn(move || {
225 barrier.wait();
226 for _ in 0..100 {
227 let txn = env.begin_ro_txn().unwrap();
228 let db = txn.open_db(None).unwrap();
229 let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
230 assert_eq!(val, Some(*b"val"));
231 drop(txn);
232 }
233 })
234 })
235 .collect();
236
237 for h in handles {
238 h.join().unwrap();
239 }
240 }
241
242 #[test]
243 fn multithreaded_mixed_read_write() {
244 let (_dir, env) = test_env();
245 seed(&env);
246
247 let env = std::sync::Arc::new(env);
248 let barrier = std::sync::Arc::new(std::sync::Barrier::new(5));
249
250 let mut handles: Vec<_> = (0..4)
252 .map(|_| {
253 let env = env.clone();
254 let barrier = barrier.clone();
255 std::thread::spawn(move || {
256 barrier.wait();
257 for _ in 0..50 {
258 let txn = env.begin_ro_txn().unwrap();
259 let db = txn.open_db(None).unwrap();
260 let _val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
262 drop(txn);
263 }
264 })
265 })
266 .collect();
267
268 handles.push(std::thread::spawn(move || {
270 barrier.wait();
271 for i in 0u32..20 {
272 let tx = env.begin_rw_txn().unwrap();
273 let db = tx.open_db(None).unwrap();
274 tx.put(db.dbi(), i.to_le_bytes(), b"v", WriteFlags::empty()).unwrap();
275 tx.commit().unwrap();
276 }
277 }));
278
279 for h in handles {
280 h.join().unwrap();
281 }
282 }
283
284 #[test]
285 fn multithreaded_concurrent_open_close() {
286 let (_dir, env) = test_env();
287 seed(&env);
288
289 let env = std::sync::Arc::new(env);
290 let barrier = std::sync::Arc::new(std::sync::Barrier::new(16));
291
292 let handles: Vec<_> = (0..16)
294 .map(|_| {
295 let env = env.clone();
296 let barrier = barrier.clone();
297 std::thread::spawn(move || {
298 barrier.wait();
299 for _ in 0..200 {
300 let txn = env.begin_ro_txn().unwrap();
301 let db = txn.open_db(None).unwrap();
302 let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
303 assert_eq!(val, Some(*b"val"));
304 }
306 })
307 })
308 .collect();
309
310 for h in handles {
311 h.join().unwrap();
312 }
313 }
314
315 #[test]
316 fn multithreaded_hold_multiple_txns() {
317 let (_dir, env) = test_env();
318 seed(&env);
319
320 let env = std::sync::Arc::new(env);
321 let barrier = std::sync::Arc::new(std::sync::Barrier::new(8));
322
323 let handles: Vec<_> = (0..8)
325 .map(|_| {
326 let env = env.clone();
327 let barrier = barrier.clone();
328 std::thread::spawn(move || {
329 barrier.wait();
330 for _ in 0..20 {
331 let txns: Vec<_> = (0..4).map(|_| env.begin_ro_txn().unwrap()).collect();
332 for txn in &txns {
333 let db = txn.open_db(None).unwrap();
334 let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
335 assert_eq!(val, Some(*b"val"));
336 }
337 drop(txns);
338 }
339 })
340 })
341 .collect();
342
343 for h in handles {
344 h.join().unwrap();
345 }
346 }
347
348 #[test]
349 fn multithreaded_drain_under_contention() {
350 let (_dir, env) = test_env();
351 seed(&env);
352
353 let env = std::sync::Arc::new(env);
354
355 {
357 let txns: Vec<_> = (0..16).map(|_| env.begin_ro_txn().unwrap()).collect();
358 drop(txns);
359 }
360
361 let barrier = std::sync::Arc::new(std::sync::Barrier::new(5));
362
363 let mut handles: Vec<_> = (0..4)
365 .map(|_| {
366 let env = env.clone();
367 let barrier = barrier.clone();
368 std::thread::spawn(move || {
369 barrier.wait();
370 for _ in 0..50 {
371 let _txn = env.begin_ro_txn(); }
373 })
374 })
375 .collect();
376
377 handles.push(std::thread::spawn(move || {
378 barrier.wait();
379 env.ro_txn_pool().drain();
380 }));
381
382 for h in handles {
383 h.join().unwrap();
384 }
385 }
386
387 #[test]
388 fn pool_overflow_aborts_excess() {
389 let dir = tempfile::tempdir().unwrap();
390 let env = Environment::builder().set_max_readers(512).open(dir.path()).unwrap();
391 seed(&env);
392
393 let txns: Vec<_> = (0..300).map(|_| env.begin_ro_txn().unwrap()).collect();
395 drop(txns);
396
397 assert_eq!(env.ro_txn_pool().queue.len(), 256);
399
400 let txns: Vec<_> = (0..256).map(|_| env.begin_ro_txn().unwrap()).collect();
402 for txn in &txns {
403 let db = txn.open_db(None).unwrap();
404 let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
405 assert_eq!(val, Some(*b"val"));
406 }
407 }
408}