reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation};
2use reth_db_api::{
3    table::{Compress, Decompress, Encode, Table},
4    DatabaseError,
5};
6use reth_storage_errors::{
7    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
8    provider::{ProviderError, ProviderResult},
9};
10use rocksdb::{
11    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
12    IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions,
13    WriteBatchWithTransaction, WriteOptions,
14};
15use std::{
16    fmt,
17    path::{Path, PathBuf},
18    sync::Arc,
19    time::Instant,
20};
21
22/// Default cache size for `RocksDB` block cache (128 MB).
23const DEFAULT_CACHE_SIZE: usize = 128 << 20;
24
25/// Default block size for `RocksDB` tables (16 KB).
26const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
27
28/// Default max background jobs for `RocksDB` compaction and flushing.
29const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
30
31/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
32const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
33
34/// Default bloom filter bits per key (~1% false positive rate).
35const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
36
37/// Builder for [`RocksDBProvider`].
38pub struct RocksDBBuilder {
39    path: PathBuf,
40    column_families: Vec<String>,
41    enable_metrics: bool,
42    enable_statistics: bool,
43    log_level: rocksdb::LogLevel,
44    block_cache: Cache,
45}
46
47impl fmt::Debug for RocksDBBuilder {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        f.debug_struct("RocksDBBuilder")
50            .field("path", &self.path)
51            .field("column_families", &self.column_families)
52            .field("enable_metrics", &self.enable_metrics)
53            .finish()
54    }
55}
56
57impl RocksDBBuilder {
58    /// Creates a new builder with optimized default options.
59    pub fn new(path: impl AsRef<Path>) -> Self {
60        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
61        Self {
62            path: path.as_ref().to_path_buf(),
63            column_families: Vec::new(),
64            enable_metrics: false,
65            enable_statistics: false,
66            log_level: rocksdb::LogLevel::Info,
67            block_cache: cache,
68        }
69    }
70
71    /// Creates default table options with shared block cache.
72    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
73        let mut table_options = BlockBasedOptions::default();
74        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
75        table_options.set_cache_index_and_filter_blocks(true);
76        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
77        // Shared block cache for all column families.
78        table_options.set_block_cache(cache);
79        // Bloom filter: 10 bits/key = ~1% false positive rate, full filter for better read
80        // performance. this setting is good trade off a little bit of memory for better
81        // point lookup performance. see https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#configuration-basics
82        table_options.set_bloom_filter(DEFAULT_BLOOM_FILTER_BITS, false);
83        table_options.set_optimize_filters_for_memory(true);
84        table_options
85    }
86
87    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
88    fn default_options(
89        log_level: rocksdb::LogLevel,
90        cache: &Cache,
91        enable_statistics: bool,
92    ) -> Options {
93        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
94        let table_options = Self::default_table_options(cache);
95
96        let mut options = Options::default();
97        options.set_block_based_table_factory(&table_options);
98        options.create_if_missing(true);
99        options.create_missing_column_families(true);
100        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
101        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
102
103        options.set_bottommost_compression_type(DBCompressionType::Zstd);
104        options.set_bottommost_zstd_max_train_bytes(0, true);
105        options.set_compression_type(DBCompressionType::Lz4);
106        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
107
108        options.set_log_level(log_level);
109
110        // Statistics can view from RocksDB log file
111        if enable_statistics {
112            options.enable_statistics();
113        }
114
115        options
116    }
117
118    /// Creates optimized column family options.
119    fn default_column_family_options(cache: &Cache) -> Options {
120        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
121        let table_options = Self::default_table_options(cache);
122
123        let mut cf_options = Options::default();
124        cf_options.set_block_based_table_factory(&table_options);
125        cf_options.set_level_compaction_dynamic_level_bytes(true);
126        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
127        cf_options.set_compression_type(DBCompressionType::Lz4);
128        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
129        // Only use Zstd compression, disable dictionary training
130        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
131
132        cf_options
133    }
134
135    /// Adds a column family for a specific table type.
136    pub fn with_table<T: Table>(mut self) -> Self {
137        self.column_families.push(T::NAME.to_string());
138        self
139    }
140
141    /// Enables metrics.
142    pub const fn with_metrics(mut self) -> Self {
143        self.enable_metrics = true;
144        self
145    }
146
147    /// Enables `RocksDB` internal statistics collection.
148    pub const fn with_statistics(mut self) -> Self {
149        self.enable_statistics = true;
150        self
151    }
152
153    /// Sets the log level from `DatabaseArgs` configuration.
154    pub const fn with_database_log_level(mut self, log_level: LogLevel) -> Self {
155        self.log_level = convert_log_level(log_level);
156        self
157    }
158
159    /// Sets a custom block cache size.
160    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
161        self.block_cache = Cache::new_lru_cache(capacity_bytes);
162        self
163    }
164
165    /// Builds the [`RocksDBProvider`].
166    pub fn build(self) -> ProviderResult<RocksDBProvider> {
167        let options =
168            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
169
170        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
171            .column_families
172            .iter()
173            .map(|name| {
174                ColumnFamilyDescriptor::new(
175                    name.clone(),
176                    Self::default_column_family_options(&self.block_cache),
177                )
178            })
179            .collect();
180
181        // Use TransactionDB for MDBX-like transaction semantics (read-your-writes, rollback)
182        let txn_db_options = TransactionDBOptions::default();
183        let db = TransactionDB::open_cf_descriptors(
184            &options,
185            &txn_db_options,
186            &self.path,
187            cf_descriptors,
188        )
189        .map_err(|e| {
190            ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
191                message: e.to_string().into(),
192                code: -1,
193            }))
194        })?;
195
196        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
197
198        Ok(RocksDBProvider(Arc::new(RocksDBProviderInner { db, metrics })))
199    }
200}
201
202/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
203/// allocated buffer when we can just use their reference.
204macro_rules! compress_to_buf_or_ref {
205    ($buf:expr, $value:expr) => {
206        if let Some(value) = $value.uncompressable_ref() {
207            Some(value)
208        } else {
209            $buf.clear();
210            $value.compress_to_buf(&mut $buf);
211            None
212        }
213    };
214}
215
216/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
217#[derive(Debug)]
218pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
219
220/// Inner state for `RocksDB` provider.
221struct RocksDBProviderInner {
222    /// `RocksDB` database instance with transaction support.
223    db: TransactionDB,
224    /// Metrics latency & operations.
225    metrics: Option<RocksDBMetrics>,
226}
227
228impl fmt::Debug for RocksDBProviderInner {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        f.debug_struct("RocksDBProviderInner")
231            .field("db", &"<TransactionDB>")
232            .field("metrics", &self.metrics)
233            .finish()
234    }
235}
236
237impl Clone for RocksDBProvider {
238    fn clone(&self) -> Self {
239        Self(self.0.clone())
240    }
241}
242
243impl RocksDBProvider {
244    /// Creates a new `RocksDB` provider.
245    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
246        RocksDBBuilder::new(path).build()
247    }
248
249    /// Creates a new `RocksDB` provider builder.
250    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
251        RocksDBBuilder::new(path)
252    }
253
254    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
255    pub fn tx(&self) -> RocksTx<'_> {
256        let write_options = WriteOptions::default();
257        let txn_options = TransactionOptions::default();
258        let inner = self.0.db.transaction_opt(&write_options, &txn_options);
259        RocksTx { inner, provider: self }
260    }
261
262    /// Gets the column family handle for a table.
263    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
264        self.0
265            .db
266            .cf_handle(T::NAME)
267            .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
268    }
269
270    /// Executes a function and records metrics with the given operation and table name.
271    fn execute_with_operation_metric<T>(
272        &self,
273        operation: RocksDBOperation,
274        table: &'static str,
275        f: impl FnOnce(&Self) -> T,
276    ) -> T {
277        let start = self.0.metrics.as_ref().map(|_| Instant::now());
278        let res = f(self);
279
280        if let (Some(start), Some(metrics)) = (start, &self.0.metrics) {
281            metrics.record_operation(operation, table, start.elapsed());
282        }
283
284        res
285    }
286
287    /// Gets a value from the specified table.
288    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
289        self.get_encoded::<T>(&key.encode())
290    }
291
292    /// Gets a value from the specified table using pre-encoded key.
293    pub fn get_encoded<T: Table>(
294        &self,
295        key: &<T::Key as Encode>::Encoded,
296    ) -> ProviderResult<Option<T::Value>> {
297        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
298            let result =
299                this.0.db.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
300                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
301                        message: e.to_string().into(),
302                        code: -1,
303                    }))
304                })?;
305
306            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
307        })
308    }
309
310    /// Puts upsert a value into the specified table with the given key.
311    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
312        let encoded_key = key.encode();
313        self.put_encoded::<T>(&encoded_key, value)
314    }
315
316    /// Puts a value into the specified table using pre-encoded key.
317    pub fn put_encoded<T: Table>(
318        &self,
319        key: &<T::Key as Encode>::Encoded,
320        value: &T::Value,
321    ) -> ProviderResult<()> {
322        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
323            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
324            // is thread safe if user want to avoid allocate buf each time, they can use
325            // write_batch api
326            let mut buf = Vec::new();
327            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
328
329            this.0.db.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
330                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
331                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
332                    operation: DatabaseWriteOperation::PutUpsert,
333                    table_name: T::NAME,
334                    key: key.as_ref().to_vec(),
335                })))
336            })
337        })
338    }
339
340    /// Deletes a value from the specified table.
341    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
342        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
343            this.0.db.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
344                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
345                    message: e.to_string().into(),
346                    code: -1,
347                }))
348            })
349        })
350    }
351
352    /// Writes a batch of operations atomically.
353    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
354    where
355        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
356    {
357        // Note: Using "Batch" as table name for batch operations across multiple tables
358        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
359            let mut batch_handle = RocksDBBatch {
360                provider: this,
361                inner: WriteBatchWithTransaction::<true>::default(),
362                buf: Vec::new(),
363            };
364
365            f(&mut batch_handle)?;
366
367            this.0.db.write(batch_handle.inner).map_err(|e| {
368                ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
369                    message: e.to_string().into(),
370                    code: -1,
371                }))
372            })
373        })
374    }
375}
376
377/// Handle for building a batch of operations atomically.
378///
379/// Uses `WriteBatchWithTransaction<true>` for compatibility with `TransactionDB`.
380pub struct RocksDBBatch<'a> {
381    provider: &'a RocksDBProvider,
382    inner: WriteBatchWithTransaction<true>,
383    buf: Vec<u8>,
384}
385
386impl fmt::Debug for RocksDBBatch<'_> {
387    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
388        f.debug_struct("RocksDBBatch")
389            .field("provider", &self.provider)
390            .field("batch", &"<WriteBatchWithTransaction>")
391            // Number of operations in this batch
392            .field("length", &self.inner.len())
393            // Total serialized size (encoded key + compressed value + metadata) of this batch
394            // in bytes
395            .field("size_in_bytes", &self.inner.size_in_bytes())
396            .finish()
397    }
398}
399
400impl<'a> RocksDBBatch<'a> {
401    /// Puts a value into the batch.
402    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
403        let encoded_key = key.encode();
404        self.put_encoded::<T>(&encoded_key, value)
405    }
406
407    /// Puts a value into the batch using pre-encoded key.
408    pub fn put_encoded<T: Table>(
409        &mut self,
410        key: &<T::Key as Encode>::Encoded,
411        value: &T::Value,
412    ) -> ProviderResult<()> {
413        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
414        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
415        Ok(())
416    }
417
418    /// Deletes a value from the batch.
419    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
420        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
421        Ok(())
422    }
423}
424
425/// `RocksDB` transaction wrapper providing MDBX-like semantics.
426///
427/// Supports:
428/// - Read-your-writes: reads see uncommitted writes within the same transaction
429/// - Atomic commit/rollback
430/// - Iteration over uncommitted data
431///
432/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
433/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
434pub struct RocksTx<'db> {
435    inner: Transaction<'db, TransactionDB>,
436    provider: &'db RocksDBProvider,
437}
438
439impl fmt::Debug for RocksTx<'_> {
440    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
442    }
443}
444
445impl<'db> RocksTx<'db> {
446    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
447    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
448        let encoded_key = key.encode();
449        self.get_encoded::<T>(&encoded_key)
450    }
451
452    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
453    pub fn get_encoded<T: Table>(
454        &self,
455        key: &<T::Key as Encode>::Encoded,
456    ) -> ProviderResult<Option<T::Value>> {
457        let cf = self.provider.get_cf_handle::<T>()?;
458        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
459            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
460                message: e.to_string().into(),
461                code: -1,
462            }))
463        })?;
464
465        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
466    }
467
468    /// Puts a value into the specified table.
469    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
470        let encoded_key = key.encode();
471        self.put_encoded::<T>(&encoded_key, value)
472    }
473
474    /// Puts a value using pre-encoded key.
475    pub fn put_encoded<T: Table>(
476        &self,
477        key: &<T::Key as Encode>::Encoded,
478        value: &T::Value,
479    ) -> ProviderResult<()> {
480        let cf = self.provider.get_cf_handle::<T>()?;
481        let mut buf = Vec::new();
482        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
483
484        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
485            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
486                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
487                operation: DatabaseWriteOperation::PutUpsert,
488                table_name: T::NAME,
489                key: key.as_ref().to_vec(),
490            })))
491        })
492    }
493
494    /// Deletes a value from the specified table.
495    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
496        let cf = self.provider.get_cf_handle::<T>()?;
497        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
498            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
499                message: e.to_string().into(),
500                code: -1,
501            }))
502        })
503    }
504
505    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
506    ///
507    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
508    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
509        let cf = self.provider.get_cf_handle::<T>()?;
510        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
511        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
512    }
513
514    /// Creates an iterator starting from the given key (inclusive).
515    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
516        let cf = self.provider.get_cf_handle::<T>()?;
517        let encoded_key = key.encode();
518        let iter = self
519            .inner
520            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
521        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
522    }
523
524    /// Commits the transaction, persisting all changes.
525    pub fn commit(self) -> ProviderResult<()> {
526        self.inner.commit().map_err(|e| {
527            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
528                message: e.to_string().into(),
529                code: -1,
530            }))
531        })
532    }
533
534    /// Rolls back the transaction, discarding all changes.
535    pub fn rollback(self) -> ProviderResult<()> {
536        self.inner.rollback().map_err(|e| {
537            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
538        })
539    }
540}
541
542/// Iterator over a `RocksDB` table within a transaction.
543///
544/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
545pub struct RocksTxIter<'tx, T: Table> {
546    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, TransactionDB>>,
547    _marker: std::marker::PhantomData<T>,
548}
549
550impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
551    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
553    }
554}
555
556impl<T: Table> Iterator for RocksTxIter<'_, T> {
557    type Item = ProviderResult<(T::Key, T::Value)>;
558
559    fn next(&mut self) -> Option<Self::Item> {
560        let (key_bytes, value_bytes) = match self.inner.next()? {
561            Ok(kv) => kv,
562            Err(e) => {
563                return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
564                    message: e.to_string().into(),
565                    code: -1,
566                }))))
567            }
568        };
569
570        // Decode key
571        let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
572            Ok(k) => k,
573            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
574        };
575
576        // Decompress value
577        let value = match T::Value::decompress(&value_bytes) {
578            Ok(v) => v,
579            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
580        };
581
582        Some(Ok((key, value)))
583    }
584}
585
586/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
587const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
588    match level {
589        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
590        LogLevel::Error => rocksdb::LogLevel::Error,
591        LogLevel::Warn => rocksdb::LogLevel::Warn,
592        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
593        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use alloy_primitives::{TxHash, B256};
601    use reth_db_api::{table::Table, tables};
602    use tempfile::TempDir;
603
604    #[derive(Debug)]
605    struct TestTable;
606
607    impl Table for TestTable {
608        const NAME: &'static str = "TestTable";
609        const DUPSORT: bool = false;
610        type Key = u64;
611        type Value = Vec<u8>;
612    }
613
614    #[test]
615    fn test_basic_operations() {
616        let temp_dir = TempDir::new().unwrap();
617
618        let provider = RocksDBBuilder::new(temp_dir.path())
619            .with_table::<TestTable>() // Type-safe!
620            .build()
621            .unwrap();
622
623        let key = 42u64;
624        let value = b"test_value".to_vec();
625
626        // Test write
627        provider.put::<TestTable>(key, &value).unwrap();
628
629        // Test read
630        let result = provider.get::<TestTable>(key).unwrap();
631        assert_eq!(result, Some(value));
632
633        // Test delete
634        provider.delete::<TestTable>(key).unwrap();
635
636        // Verify deletion
637        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
638    }
639
640    #[test]
641    fn test_batch_operations() {
642        let temp_dir = TempDir::new().unwrap();
643        let provider =
644            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
645
646        // Write multiple entries in a batch
647        provider
648            .write_batch(|batch| {
649                for i in 0..10u64 {
650                    let value = format!("value_{i}").into_bytes();
651                    batch.put::<TestTable>(i, &value)?;
652                }
653                Ok(())
654            })
655            .unwrap();
656
657        // Read all entries
658        for i in 0..10u64 {
659            let value = format!("value_{i}").into_bytes();
660            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
661        }
662
663        // Delete all entries in a batch
664        provider
665            .write_batch(|batch| {
666                for i in 0..10u64 {
667                    batch.delete::<TestTable>(i)?;
668                }
669                Ok(())
670            })
671            .unwrap();
672
673        // Verify all deleted
674        for i in 0..10u64 {
675            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
676        }
677    }
678
679    #[test]
680    fn test_with_real_table() {
681        let temp_dir = TempDir::new().unwrap();
682        let provider = RocksDBBuilder::new(temp_dir.path())
683            .with_table::<tables::TransactionHashNumbers>()
684            .with_metrics()
685            .build()
686            .unwrap();
687
688        let tx_hash = TxHash::from(B256::from([1u8; 32]));
689
690        // Insert and retrieve
691        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
692        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
693
694        // Batch insert multiple transactions
695        provider
696            .write_batch(|batch| {
697                for i in 0..10u64 {
698                    let hash = TxHash::from(B256::from([i as u8; 32]));
699                    let value = i * 100;
700                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
701                }
702                Ok(())
703            })
704            .unwrap();
705
706        // Verify batch insertions
707        for i in 0..10u64 {
708            let hash = TxHash::from(B256::from([i as u8; 32]));
709            assert_eq!(
710                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
711                Some(i * 100)
712            );
713        }
714    }
715    #[test]
716    fn test_statistics_enabled() {
717        let temp_dir = TempDir::new().unwrap();
718        // Just verify that building with statistics doesn't panic
719        let provider = RocksDBBuilder::new(temp_dir.path())
720            .with_table::<TestTable>()
721            .with_statistics()
722            .build()
723            .unwrap();
724
725        // Do operations - data should be immediately readable with TransactionDB
726        for i in 0..10 {
727            let value = vec![i as u8];
728            provider.put::<TestTable>(i, &value).unwrap();
729            // Verify write is visible
730            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
731        }
732    }
733
734    #[test]
735    fn test_data_persistence() {
736        let temp_dir = TempDir::new().unwrap();
737        let provider =
738            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
739
740        // Insert data - TransactionDB writes are immediately visible
741        let value = vec![42u8; 1000];
742        for i in 0..100 {
743            provider.put::<TestTable>(i, &value).unwrap();
744        }
745
746        // Verify data is readable
747        for i in 0..100 {
748            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
749        }
750    }
751
752    #[test]
753    fn test_transaction_read_your_writes() {
754        let temp_dir = TempDir::new().unwrap();
755        let provider =
756            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
757
758        // Create a transaction
759        let tx = provider.tx();
760
761        // Write data within the transaction
762        let key = 42u64;
763        let value = b"test_value".to_vec();
764        tx.put::<TestTable>(key, &value).unwrap();
765
766        // Read-your-writes: should see uncommitted data in same transaction
767        let result = tx.get::<TestTable>(key).unwrap();
768        assert_eq!(
769            result,
770            Some(value.clone()),
771            "Transaction should see its own uncommitted writes"
772        );
773
774        // Data should NOT be visible via provider (outside transaction)
775        let provider_result = provider.get::<TestTable>(key).unwrap();
776        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
777
778        // Commit the transaction
779        tx.commit().unwrap();
780
781        // Now data should be visible via provider
782        let committed_result = provider.get::<TestTable>(key).unwrap();
783        assert_eq!(committed_result, Some(value), "Committed data should be visible");
784    }
785
786    #[test]
787    fn test_transaction_rollback() {
788        let temp_dir = TempDir::new().unwrap();
789        let provider =
790            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
791
792        // First, put some initial data
793        let key = 100u64;
794        let initial_value = b"initial".to_vec();
795        provider.put::<TestTable>(key, &initial_value).unwrap();
796
797        // Create a transaction and modify data
798        let tx = provider.tx();
799        let new_value = b"modified".to_vec();
800        tx.put::<TestTable>(key, &new_value).unwrap();
801
802        // Verify modification is visible within transaction
803        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
804
805        // Rollback instead of commit
806        tx.rollback().unwrap();
807
808        // Data should be unchanged (initial value)
809        let result = provider.get::<TestTable>(key).unwrap();
810        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
811    }
812
813    #[test]
814    fn test_transaction_iterator() {
815        let temp_dir = TempDir::new().unwrap();
816        let provider =
817            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
818
819        // Create a transaction
820        let tx = provider.tx();
821
822        // Write multiple entries
823        for i in 0..5u64 {
824            let value = format!("value_{i}").into_bytes();
825            tx.put::<TestTable>(i, &value).unwrap();
826        }
827
828        // Iterate - should see uncommitted writes
829        let mut count = 0;
830        for result in tx.iter::<TestTable>().unwrap() {
831            let (key, value) = result.unwrap();
832            assert_eq!(value, format!("value_{key}").into_bytes());
833            count += 1;
834        }
835        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
836
837        // Commit
838        tx.commit().unwrap();
839    }
840}