1use super::metrics::{RocksDBMetrics, RocksDBOperation};
2use reth_db_api::{
3 table::{Compress, Decompress, Encode, Table},
4 DatabaseError,
5};
6use reth_storage_errors::{
7 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
8 provider::{ProviderError, ProviderResult},
9};
10use rocksdb::{
11 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
12 IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions,
13 WriteBatchWithTransaction, WriteOptions,
14};
15use std::{
16 fmt,
17 path::{Path, PathBuf},
18 sync::Arc,
19 time::Instant,
20};
21
22const DEFAULT_CACHE_SIZE: usize = 128 << 20;
24
25const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
27
28const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
30
31const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
33
34const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
36
37pub struct RocksDBBuilder {
39 path: PathBuf,
40 column_families: Vec<String>,
41 enable_metrics: bool,
42 enable_statistics: bool,
43 log_level: rocksdb::LogLevel,
44 block_cache: Cache,
45}
46
47impl fmt::Debug for RocksDBBuilder {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 f.debug_struct("RocksDBBuilder")
50 .field("path", &self.path)
51 .field("column_families", &self.column_families)
52 .field("enable_metrics", &self.enable_metrics)
53 .finish()
54 }
55}
56
57impl RocksDBBuilder {
58 pub fn new(path: impl AsRef<Path>) -> Self {
60 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
61 Self {
62 path: path.as_ref().to_path_buf(),
63 column_families: Vec::new(),
64 enable_metrics: false,
65 enable_statistics: false,
66 log_level: rocksdb::LogLevel::Info,
67 block_cache: cache,
68 }
69 }
70
71 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
73 let mut table_options = BlockBasedOptions::default();
74 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
75 table_options.set_cache_index_and_filter_blocks(true);
76 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
77 table_options.set_block_cache(cache);
79 table_options.set_bloom_filter(DEFAULT_BLOOM_FILTER_BITS, false);
83 table_options.set_optimize_filters_for_memory(true);
84 table_options
85 }
86
87 fn default_options(
89 log_level: rocksdb::LogLevel,
90 cache: &Cache,
91 enable_statistics: bool,
92 ) -> Options {
93 let table_options = Self::default_table_options(cache);
95
96 let mut options = Options::default();
97 options.set_block_based_table_factory(&table_options);
98 options.create_if_missing(true);
99 options.create_missing_column_families(true);
100 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
101 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
102
103 options.set_bottommost_compression_type(DBCompressionType::Zstd);
104 options.set_bottommost_zstd_max_train_bytes(0, true);
105 options.set_compression_type(DBCompressionType::Lz4);
106 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
107
108 options.set_log_level(log_level);
109
110 if enable_statistics {
112 options.enable_statistics();
113 }
114
115 options
116 }
117
118 fn default_column_family_options(cache: &Cache) -> Options {
120 let table_options = Self::default_table_options(cache);
122
123 let mut cf_options = Options::default();
124 cf_options.set_block_based_table_factory(&table_options);
125 cf_options.set_level_compaction_dynamic_level_bytes(true);
126 cf_options.set_compression_type(DBCompressionType::Lz4);
128 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
129 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
131
132 cf_options
133 }
134
135 pub fn with_table<T: Table>(mut self) -> Self {
137 self.column_families.push(T::NAME.to_string());
138 self
139 }
140
141 pub const fn with_metrics(mut self) -> Self {
143 self.enable_metrics = true;
144 self
145 }
146
147 pub const fn with_statistics(mut self) -> Self {
149 self.enable_statistics = true;
150 self
151 }
152
153 pub const fn with_database_log_level(mut self, log_level: LogLevel) -> Self {
155 self.log_level = convert_log_level(log_level);
156 self
157 }
158
159 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
161 self.block_cache = Cache::new_lru_cache(capacity_bytes);
162 self
163 }
164
165 pub fn build(self) -> ProviderResult<RocksDBProvider> {
167 let options =
168 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
169
170 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
171 .column_families
172 .iter()
173 .map(|name| {
174 ColumnFamilyDescriptor::new(
175 name.clone(),
176 Self::default_column_family_options(&self.block_cache),
177 )
178 })
179 .collect();
180
181 let txn_db_options = TransactionDBOptions::default();
183 let db = TransactionDB::open_cf_descriptors(
184 &options,
185 &txn_db_options,
186 &self.path,
187 cf_descriptors,
188 )
189 .map_err(|e| {
190 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
191 message: e.to_string().into(),
192 code: -1,
193 }))
194 })?;
195
196 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
197
198 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner { db, metrics })))
199 }
200}
201
202macro_rules! compress_to_buf_or_ref {
205 ($buf:expr, $value:expr) => {
206 if let Some(value) = $value.uncompressable_ref() {
207 Some(value)
208 } else {
209 $buf.clear();
210 $value.compress_to_buf(&mut $buf);
211 None
212 }
213 };
214}
215
216#[derive(Debug)]
218pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
219
220struct RocksDBProviderInner {
222 db: TransactionDB,
224 metrics: Option<RocksDBMetrics>,
226}
227
228impl fmt::Debug for RocksDBProviderInner {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 f.debug_struct("RocksDBProviderInner")
231 .field("db", &"<TransactionDB>")
232 .field("metrics", &self.metrics)
233 .finish()
234 }
235}
236
237impl Clone for RocksDBProvider {
238 fn clone(&self) -> Self {
239 Self(self.0.clone())
240 }
241}
242
243impl RocksDBProvider {
244 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
246 RocksDBBuilder::new(path).build()
247 }
248
249 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
251 RocksDBBuilder::new(path)
252 }
253
254 pub fn tx(&self) -> RocksTx<'_> {
256 let write_options = WriteOptions::default();
257 let txn_options = TransactionOptions::default();
258 let inner = self.0.db.transaction_opt(&write_options, &txn_options);
259 RocksTx { inner, provider: self }
260 }
261
262 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
264 self.0
265 .db
266 .cf_handle(T::NAME)
267 .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
268 }
269
270 fn execute_with_operation_metric<T>(
272 &self,
273 operation: RocksDBOperation,
274 table: &'static str,
275 f: impl FnOnce(&Self) -> T,
276 ) -> T {
277 let start = self.0.metrics.as_ref().map(|_| Instant::now());
278 let res = f(self);
279
280 if let (Some(start), Some(metrics)) = (start, &self.0.metrics) {
281 metrics.record_operation(operation, table, start.elapsed());
282 }
283
284 res
285 }
286
287 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
289 self.get_encoded::<T>(&key.encode())
290 }
291
292 pub fn get_encoded<T: Table>(
294 &self,
295 key: &<T::Key as Encode>::Encoded,
296 ) -> ProviderResult<Option<T::Value>> {
297 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
298 let result =
299 this.0.db.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
300 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
301 message: e.to_string().into(),
302 code: -1,
303 }))
304 })?;
305
306 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
307 })
308 }
309
310 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
312 let encoded_key = key.encode();
313 self.put_encoded::<T>(&encoded_key, value)
314 }
315
316 pub fn put_encoded<T: Table>(
318 &self,
319 key: &<T::Key as Encode>::Encoded,
320 value: &T::Value,
321 ) -> ProviderResult<()> {
322 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
323 let mut buf = Vec::new();
327 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
328
329 this.0.db.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
330 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
331 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
332 operation: DatabaseWriteOperation::PutUpsert,
333 table_name: T::NAME,
334 key: key.as_ref().to_vec(),
335 })))
336 })
337 })
338 }
339
340 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
342 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
343 this.0.db.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
344 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
345 message: e.to_string().into(),
346 code: -1,
347 }))
348 })
349 })
350 }
351
352 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
354 where
355 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
356 {
357 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
359 let mut batch_handle = RocksDBBatch {
360 provider: this,
361 inner: WriteBatchWithTransaction::<true>::default(),
362 buf: Vec::new(),
363 };
364
365 f(&mut batch_handle)?;
366
367 this.0.db.write(batch_handle.inner).map_err(|e| {
368 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
369 message: e.to_string().into(),
370 code: -1,
371 }))
372 })
373 })
374 }
375}
376
377pub struct RocksDBBatch<'a> {
381 provider: &'a RocksDBProvider,
382 inner: WriteBatchWithTransaction<true>,
383 buf: Vec<u8>,
384}
385
386impl fmt::Debug for RocksDBBatch<'_> {
387 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
388 f.debug_struct("RocksDBBatch")
389 .field("provider", &self.provider)
390 .field("batch", &"<WriteBatchWithTransaction>")
391 .field("length", &self.inner.len())
393 .field("size_in_bytes", &self.inner.size_in_bytes())
396 .finish()
397 }
398}
399
400impl<'a> RocksDBBatch<'a> {
401 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
403 let encoded_key = key.encode();
404 self.put_encoded::<T>(&encoded_key, value)
405 }
406
407 pub fn put_encoded<T: Table>(
409 &mut self,
410 key: &<T::Key as Encode>::Encoded,
411 value: &T::Value,
412 ) -> ProviderResult<()> {
413 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
414 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
415 Ok(())
416 }
417
418 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
420 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
421 Ok(())
422 }
423}
424
425pub struct RocksTx<'db> {
435 inner: Transaction<'db, TransactionDB>,
436 provider: &'db RocksDBProvider,
437}
438
439impl fmt::Debug for RocksTx<'_> {
440 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
442 }
443}
444
445impl<'db> RocksTx<'db> {
446 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
448 let encoded_key = key.encode();
449 self.get_encoded::<T>(&encoded_key)
450 }
451
452 pub fn get_encoded<T: Table>(
454 &self,
455 key: &<T::Key as Encode>::Encoded,
456 ) -> ProviderResult<Option<T::Value>> {
457 let cf = self.provider.get_cf_handle::<T>()?;
458 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
459 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
460 message: e.to_string().into(),
461 code: -1,
462 }))
463 })?;
464
465 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
466 }
467
468 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
470 let encoded_key = key.encode();
471 self.put_encoded::<T>(&encoded_key, value)
472 }
473
474 pub fn put_encoded<T: Table>(
476 &self,
477 key: &<T::Key as Encode>::Encoded,
478 value: &T::Value,
479 ) -> ProviderResult<()> {
480 let cf = self.provider.get_cf_handle::<T>()?;
481 let mut buf = Vec::new();
482 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
483
484 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
485 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
486 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
487 operation: DatabaseWriteOperation::PutUpsert,
488 table_name: T::NAME,
489 key: key.as_ref().to_vec(),
490 })))
491 })
492 }
493
494 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
496 let cf = self.provider.get_cf_handle::<T>()?;
497 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
498 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
499 message: e.to_string().into(),
500 code: -1,
501 }))
502 })
503 }
504
505 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
509 let cf = self.provider.get_cf_handle::<T>()?;
510 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
511 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
512 }
513
514 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
516 let cf = self.provider.get_cf_handle::<T>()?;
517 let encoded_key = key.encode();
518 let iter = self
519 .inner
520 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
521 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
522 }
523
524 pub fn commit(self) -> ProviderResult<()> {
526 self.inner.commit().map_err(|e| {
527 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
528 message: e.to_string().into(),
529 code: -1,
530 }))
531 })
532 }
533
534 pub fn rollback(self) -> ProviderResult<()> {
536 self.inner.rollback().map_err(|e| {
537 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
538 })
539 }
540}
541
542pub struct RocksTxIter<'tx, T: Table> {
546 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, TransactionDB>>,
547 _marker: std::marker::PhantomData<T>,
548}
549
550impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
551 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
553 }
554}
555
556impl<T: Table> Iterator for RocksTxIter<'_, T> {
557 type Item = ProviderResult<(T::Key, T::Value)>;
558
559 fn next(&mut self) -> Option<Self::Item> {
560 let (key_bytes, value_bytes) = match self.inner.next()? {
561 Ok(kv) => kv,
562 Err(e) => {
563 return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
564 message: e.to_string().into(),
565 code: -1,
566 }))))
567 }
568 };
569
570 let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
572 Ok(k) => k,
573 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
574 };
575
576 let value = match T::Value::decompress(&value_bytes) {
578 Ok(v) => v,
579 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
580 };
581
582 Some(Ok((key, value)))
583 }
584}
585
586const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
588 match level {
589 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
590 LogLevel::Error => rocksdb::LogLevel::Error,
591 LogLevel::Warn => rocksdb::LogLevel::Warn,
592 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
593 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use alloy_primitives::{TxHash, B256};
601 use reth_db_api::{table::Table, tables};
602 use tempfile::TempDir;
603
604 #[derive(Debug)]
605 struct TestTable;
606
607 impl Table for TestTable {
608 const NAME: &'static str = "TestTable";
609 const DUPSORT: bool = false;
610 type Key = u64;
611 type Value = Vec<u8>;
612 }
613
614 #[test]
615 fn test_basic_operations() {
616 let temp_dir = TempDir::new().unwrap();
617
618 let provider = RocksDBBuilder::new(temp_dir.path())
619 .with_table::<TestTable>() .build()
621 .unwrap();
622
623 let key = 42u64;
624 let value = b"test_value".to_vec();
625
626 provider.put::<TestTable>(key, &value).unwrap();
628
629 let result = provider.get::<TestTable>(key).unwrap();
631 assert_eq!(result, Some(value));
632
633 provider.delete::<TestTable>(key).unwrap();
635
636 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
638 }
639
640 #[test]
641 fn test_batch_operations() {
642 let temp_dir = TempDir::new().unwrap();
643 let provider =
644 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
645
646 provider
648 .write_batch(|batch| {
649 for i in 0..10u64 {
650 let value = format!("value_{i}").into_bytes();
651 batch.put::<TestTable>(i, &value)?;
652 }
653 Ok(())
654 })
655 .unwrap();
656
657 for i in 0..10u64 {
659 let value = format!("value_{i}").into_bytes();
660 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
661 }
662
663 provider
665 .write_batch(|batch| {
666 for i in 0..10u64 {
667 batch.delete::<TestTable>(i)?;
668 }
669 Ok(())
670 })
671 .unwrap();
672
673 for i in 0..10u64 {
675 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
676 }
677 }
678
679 #[test]
680 fn test_with_real_table() {
681 let temp_dir = TempDir::new().unwrap();
682 let provider = RocksDBBuilder::new(temp_dir.path())
683 .with_table::<tables::TransactionHashNumbers>()
684 .with_metrics()
685 .build()
686 .unwrap();
687
688 let tx_hash = TxHash::from(B256::from([1u8; 32]));
689
690 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
692 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
693
694 provider
696 .write_batch(|batch| {
697 for i in 0..10u64 {
698 let hash = TxHash::from(B256::from([i as u8; 32]));
699 let value = i * 100;
700 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
701 }
702 Ok(())
703 })
704 .unwrap();
705
706 for i in 0..10u64 {
708 let hash = TxHash::from(B256::from([i as u8; 32]));
709 assert_eq!(
710 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
711 Some(i * 100)
712 );
713 }
714 }
715 #[test]
716 fn test_statistics_enabled() {
717 let temp_dir = TempDir::new().unwrap();
718 let provider = RocksDBBuilder::new(temp_dir.path())
720 .with_table::<TestTable>()
721 .with_statistics()
722 .build()
723 .unwrap();
724
725 for i in 0..10 {
727 let value = vec![i as u8];
728 provider.put::<TestTable>(i, &value).unwrap();
729 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
731 }
732 }
733
734 #[test]
735 fn test_data_persistence() {
736 let temp_dir = TempDir::new().unwrap();
737 let provider =
738 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
739
740 let value = vec![42u8; 1000];
742 for i in 0..100 {
743 provider.put::<TestTable>(i, &value).unwrap();
744 }
745
746 for i in 0..100 {
748 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
749 }
750 }
751
752 #[test]
753 fn test_transaction_read_your_writes() {
754 let temp_dir = TempDir::new().unwrap();
755 let provider =
756 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
757
758 let tx = provider.tx();
760
761 let key = 42u64;
763 let value = b"test_value".to_vec();
764 tx.put::<TestTable>(key, &value).unwrap();
765
766 let result = tx.get::<TestTable>(key).unwrap();
768 assert_eq!(
769 result,
770 Some(value.clone()),
771 "Transaction should see its own uncommitted writes"
772 );
773
774 let provider_result = provider.get::<TestTable>(key).unwrap();
776 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
777
778 tx.commit().unwrap();
780
781 let committed_result = provider.get::<TestTable>(key).unwrap();
783 assert_eq!(committed_result, Some(value), "Committed data should be visible");
784 }
785
786 #[test]
787 fn test_transaction_rollback() {
788 let temp_dir = TempDir::new().unwrap();
789 let provider =
790 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
791
792 let key = 100u64;
794 let initial_value = b"initial".to_vec();
795 provider.put::<TestTable>(key, &initial_value).unwrap();
796
797 let tx = provider.tx();
799 let new_value = b"modified".to_vec();
800 tx.put::<TestTable>(key, &new_value).unwrap();
801
802 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
804
805 tx.rollback().unwrap();
807
808 let result = provider.get::<TestTable>(key).unwrap();
810 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
811 }
812
813 #[test]
814 fn test_transaction_iterator() {
815 let temp_dir = TempDir::new().unwrap();
816 let provider =
817 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
818
819 let tx = provider.tx();
821
822 for i in 0..5u64 {
824 let value = format!("value_{i}").into_bytes();
825 tx.put::<TestTable>(i, &value).unwrap();
826 }
827
828 let mut count = 0;
830 for result in tx.iter::<TestTable>().unwrap() {
831 let (key, value) = result.unwrap();
832 assert_eq!(value, format!("value_{key}").into_bytes());
833 count += 1;
834 }
835 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
836
837 tx.commit().unwrap();
839 }
840}