reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation};
2use reth_db_api::{
3    table::{Compress, Decompress, Encode, Table},
4    tables, DatabaseError,
5};
6use reth_storage_errors::{
7    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
8    provider::{ProviderError, ProviderResult},
9};
10use rocksdb::{
11    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
12    IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions,
13    WriteBatchWithTransaction, WriteOptions,
14};
15use std::{
16    fmt,
17    path::{Path, PathBuf},
18    sync::Arc,
19    time::Instant,
20};
21
22/// Default cache size for `RocksDB` block cache (128 MB).
23const DEFAULT_CACHE_SIZE: usize = 128 << 20;
24
25/// Default block size for `RocksDB` tables (16 KB).
26const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
27
28/// Default max background jobs for `RocksDB` compaction and flushing.
29const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
30
31/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
32const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
33
34/// Default bloom filter bits per key (~1% false positive rate).
35const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
36
37/// Default buffer capacity for compression in batches.
38/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
39/// reducing the first few reallocations without over-allocating.
40const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
41
42/// Builder for [`RocksDBProvider`].
43pub struct RocksDBBuilder {
44    path: PathBuf,
45    column_families: Vec<String>,
46    enable_metrics: bool,
47    enable_statistics: bool,
48    log_level: rocksdb::LogLevel,
49    block_cache: Cache,
50}
51
52impl fmt::Debug for RocksDBBuilder {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("RocksDBBuilder")
55            .field("path", &self.path)
56            .field("column_families", &self.column_families)
57            .field("enable_metrics", &self.enable_metrics)
58            .finish()
59    }
60}
61
62impl RocksDBBuilder {
63    /// Creates a new builder with optimized default options.
64    pub fn new(path: impl AsRef<Path>) -> Self {
65        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
66        Self {
67            path: path.as_ref().to_path_buf(),
68            column_families: Vec::new(),
69            enable_metrics: false,
70            enable_statistics: false,
71            log_level: rocksdb::LogLevel::Info,
72            block_cache: cache,
73        }
74    }
75
76    /// Creates default table options with shared block cache.
77    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
78        let mut table_options = BlockBasedOptions::default();
79        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
80        table_options.set_cache_index_and_filter_blocks(true);
81        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
82        // Shared block cache for all column families.
83        table_options.set_block_cache(cache);
84        // Bloom filter: 10 bits/key = ~1% false positive rate, full filter for better read
85        // performance. this setting is good trade off a little bit of memory for better
86        // point lookup performance. see https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#configuration-basics
87        table_options.set_bloom_filter(DEFAULT_BLOOM_FILTER_BITS, false);
88        table_options.set_optimize_filters_for_memory(true);
89        table_options
90    }
91
92    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
93    fn default_options(
94        log_level: rocksdb::LogLevel,
95        cache: &Cache,
96        enable_statistics: bool,
97    ) -> Options {
98        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
99        let table_options = Self::default_table_options(cache);
100
101        let mut options = Options::default();
102        options.set_block_based_table_factory(&table_options);
103        options.create_if_missing(true);
104        options.create_missing_column_families(true);
105        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
106        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
107
108        options.set_bottommost_compression_type(DBCompressionType::Zstd);
109        options.set_bottommost_zstd_max_train_bytes(0, true);
110        options.set_compression_type(DBCompressionType::Lz4);
111        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
112
113        options.set_log_level(log_level);
114
115        // Statistics can view from RocksDB log file
116        if enable_statistics {
117            options.enable_statistics();
118        }
119
120        options
121    }
122
123    /// Creates optimized column family options.
124    fn default_column_family_options(cache: &Cache) -> Options {
125        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
126        let table_options = Self::default_table_options(cache);
127
128        let mut cf_options = Options::default();
129        cf_options.set_block_based_table_factory(&table_options);
130        cf_options.set_level_compaction_dynamic_level_bytes(true);
131        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
132        cf_options.set_compression_type(DBCompressionType::Lz4);
133        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
134        // Only use Zstd compression, disable dictionary training
135        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
136
137        cf_options
138    }
139
140    /// Adds a column family for a specific table type.
141    pub fn with_table<T: Table>(mut self) -> Self {
142        self.column_families.push(T::NAME.to_string());
143        self
144    }
145
146    /// Registers the default tables used by reth for `RocksDB` storage.
147    ///
148    /// This registers:
149    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
150    /// - [`tables::AccountsHistory`] - Account history index
151    /// - [`tables::StoragesHistory`] - Storage history index
152    pub fn with_default_tables(self) -> Self {
153        self.with_table::<tables::TransactionHashNumbers>()
154            .with_table::<tables::AccountsHistory>()
155            .with_table::<tables::StoragesHistory>()
156    }
157
158    /// Enables metrics.
159    pub const fn with_metrics(mut self) -> Self {
160        self.enable_metrics = true;
161        self
162    }
163
164    /// Enables `RocksDB` internal statistics collection.
165    pub const fn with_statistics(mut self) -> Self {
166        self.enable_statistics = true;
167        self
168    }
169
170    /// Sets the log level from `DatabaseArgs` configuration.
171    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
172        if let Some(level) = log_level {
173            self.log_level = convert_log_level(level);
174        }
175        self
176    }
177
178    /// Sets a custom block cache size.
179    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
180        self.block_cache = Cache::new_lru_cache(capacity_bytes);
181        self
182    }
183
184    /// Builds the [`RocksDBProvider`].
185    pub fn build(self) -> ProviderResult<RocksDBProvider> {
186        let options =
187            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
188
189        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
190            .column_families
191            .iter()
192            .map(|name| {
193                ColumnFamilyDescriptor::new(
194                    name.clone(),
195                    Self::default_column_family_options(&self.block_cache),
196                )
197            })
198            .collect();
199
200        // Use TransactionDB for MDBX-like transaction semantics (read-your-writes, rollback)
201        let txn_db_options = TransactionDBOptions::default();
202        let db = TransactionDB::open_cf_descriptors(
203            &options,
204            &txn_db_options,
205            &self.path,
206            cf_descriptors,
207        )
208        .map_err(|e| {
209            ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
210                message: e.to_string().into(),
211                code: -1,
212            }))
213        })?;
214
215        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
216
217        Ok(RocksDBProvider(Arc::new(RocksDBProviderInner { db, metrics })))
218    }
219}
220
221/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
222/// allocated buffer when we can just use their reference.
223macro_rules! compress_to_buf_or_ref {
224    ($buf:expr, $value:expr) => {
225        if let Some(value) = $value.uncompressable_ref() {
226            Some(value)
227        } else {
228            $buf.clear();
229            $value.compress_to_buf(&mut $buf);
230            None
231        }
232    };
233}
234
235/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
236#[derive(Debug)]
237pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
238
239/// Inner state for `RocksDB` provider.
240struct RocksDBProviderInner {
241    /// `RocksDB` database instance with transaction support.
242    db: TransactionDB,
243    /// Metrics latency & operations.
244    metrics: Option<RocksDBMetrics>,
245}
246
247impl fmt::Debug for RocksDBProviderInner {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        f.debug_struct("RocksDBProviderInner")
250            .field("db", &"<TransactionDB>")
251            .field("metrics", &self.metrics)
252            .finish()
253    }
254}
255
256impl Clone for RocksDBProvider {
257    fn clone(&self) -> Self {
258        Self(self.0.clone())
259    }
260}
261
262impl RocksDBProvider {
263    /// Creates a new `RocksDB` provider.
264    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
265        RocksDBBuilder::new(path).build()
266    }
267
268    /// Creates a new `RocksDB` provider builder.
269    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
270        RocksDBBuilder::new(path)
271    }
272
273    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
274    pub fn tx(&self) -> RocksTx<'_> {
275        let write_options = WriteOptions::default();
276        let txn_options = TransactionOptions::default();
277        let inner = self.0.db.transaction_opt(&write_options, &txn_options);
278        RocksTx { inner, provider: self }
279    }
280
281    /// Creates a new batch for atomic writes.
282    ///
283    /// Use [`Self::write_batch`] for closure-based atomic writes.
284    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
285    pub fn batch(&self) -> RocksDBBatch<'_> {
286        RocksDBBatch {
287            provider: self,
288            inner: WriteBatchWithTransaction::<true>::default(),
289            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
290        }
291    }
292
293    /// Gets the column family handle for a table.
294    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
295        self.0
296            .db
297            .cf_handle(T::NAME)
298            .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
299    }
300
301    /// Executes a function and records metrics with the given operation and table name.
302    fn execute_with_operation_metric<T>(
303        &self,
304        operation: RocksDBOperation,
305        table: &'static str,
306        f: impl FnOnce(&Self) -> T,
307    ) -> T {
308        let start = self.0.metrics.as_ref().map(|_| Instant::now());
309        let res = f(self);
310
311        if let (Some(start), Some(metrics)) = (start, &self.0.metrics) {
312            metrics.record_operation(operation, table, start.elapsed());
313        }
314
315        res
316    }
317
318    /// Gets a value from the specified table.
319    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
320        self.get_encoded::<T>(&key.encode())
321    }
322
323    /// Gets a value from the specified table using pre-encoded key.
324    pub fn get_encoded<T: Table>(
325        &self,
326        key: &<T::Key as Encode>::Encoded,
327    ) -> ProviderResult<Option<T::Value>> {
328        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
329            let result =
330                this.0.db.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
331                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
332                        message: e.to_string().into(),
333                        code: -1,
334                    }))
335                })?;
336
337            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
338        })
339    }
340
341    /// Puts upsert a value into the specified table with the given key.
342    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
343        let encoded_key = key.encode();
344        self.put_encoded::<T>(&encoded_key, value)
345    }
346
347    /// Puts a value into the specified table using pre-encoded key.
348    pub fn put_encoded<T: Table>(
349        &self,
350        key: &<T::Key as Encode>::Encoded,
351        value: &T::Value,
352    ) -> ProviderResult<()> {
353        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
354            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
355            // is thread safe if user want to avoid allocate buf each time, they can use
356            // write_batch api
357            let mut buf = Vec::new();
358            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
359
360            this.0.db.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
361                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
362                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
363                    operation: DatabaseWriteOperation::PutUpsert,
364                    table_name: T::NAME,
365                    key: key.as_ref().to_vec(),
366                })))
367            })
368        })
369    }
370
371    /// Deletes a value from the specified table.
372    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
373        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
374            this.0.db.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
375                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
376                    message: e.to_string().into(),
377                    code: -1,
378                }))
379            })
380        })
381    }
382
383    /// Gets the first (smallest key) entry from the specified table.
384    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
385        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
386            let cf = this.get_cf_handle::<T>()?;
387            let mut iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
388
389            match iter.next() {
390                Some(Ok((key_bytes, value_bytes))) => {
391                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
392                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
393                    let value = T::Value::decompress(&value_bytes)
394                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
395                    Ok(Some((key, value)))
396                }
397                Some(Err(e)) => {
398                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
399                        message: e.to_string().into(),
400                        code: -1,
401                    })))
402                }
403                None => Ok(None),
404            }
405        })
406    }
407
408    /// Gets the last (largest key) entry from the specified table.
409    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
410        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
411            let cf = this.get_cf_handle::<T>()?;
412            let mut iter = this.0.db.iterator_cf(cf, IteratorMode::End);
413
414            match iter.next() {
415                Some(Ok((key_bytes, value_bytes))) => {
416                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
417                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
418                    let value = T::Value::decompress(&value_bytes)
419                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
420                    Ok(Some((key, value)))
421                }
422                Some(Err(e)) => {
423                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
424                        message: e.to_string().into(),
425                        code: -1,
426                    })))
427                }
428                None => Ok(None),
429            }
430        })
431    }
432
433    /// Creates an iterator over all entries in the specified table.
434    ///
435    /// Returns decoded `(Key, Value)` pairs in key order.
436    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
437        let cf = self.get_cf_handle::<T>()?;
438        let iter = self.0.db.iterator_cf(cf, IteratorMode::Start);
439        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
440    }
441
442    /// Writes a batch of operations atomically.
443    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
444    where
445        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
446    {
447        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
448            let mut batch_handle = this.batch();
449            f(&mut batch_handle)?;
450            batch_handle.commit()
451        })
452    }
453
454    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
455    ///
456    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
457    /// and needs to be committed at a later point (e.g., at provider commit time).
458    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
459        self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
460            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
461                message: e.to_string().into(),
462                code: -1,
463            }))
464        })
465    }
466}
467
468/// Handle for building a batch of operations atomically.
469///
470/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
471/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
472/// where you don't need to read back uncommitted data within the same operation
473/// (e.g., history index writes).
474#[must_use = "batch must be committed"]
475pub struct RocksDBBatch<'a> {
476    provider: &'a RocksDBProvider,
477    inner: WriteBatchWithTransaction<true>,
478    buf: Vec<u8>,
479}
480
481impl fmt::Debug for RocksDBBatch<'_> {
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483        f.debug_struct("RocksDBBatch")
484            .field("provider", &self.provider)
485            .field("batch", &"<WriteBatchWithTransaction>")
486            // Number of operations in this batch
487            .field("length", &self.inner.len())
488            // Total serialized size (encoded key + compressed value + metadata) of this batch
489            // in bytes
490            .field("size_in_bytes", &self.inner.size_in_bytes())
491            .finish()
492    }
493}
494
495impl<'a> RocksDBBatch<'a> {
496    /// Puts a value into the batch.
497    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
498        let encoded_key = key.encode();
499        self.put_encoded::<T>(&encoded_key, value)
500    }
501
502    /// Puts a value into the batch using pre-encoded key.
503    pub fn put_encoded<T: Table>(
504        &mut self,
505        key: &<T::Key as Encode>::Encoded,
506        value: &T::Value,
507    ) -> ProviderResult<()> {
508        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
509        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
510        Ok(())
511    }
512
513    /// Deletes a value from the batch.
514    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
515        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
516        Ok(())
517    }
518
519    /// Commits the batch to the database.
520    ///
521    /// This consumes the batch and writes all operations atomically to `RocksDB`.
522    pub fn commit(self) -> ProviderResult<()> {
523        self.provider.0.db.write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
524            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
525                message: e.to_string().into(),
526                code: -1,
527            }))
528        })
529    }
530
531    /// Returns the number of write operations (puts + deletes) queued in this batch.
532    pub fn len(&self) -> usize {
533        self.inner.len()
534    }
535
536    /// Returns `true` if the batch contains no operations.
537    pub fn is_empty(&self) -> bool {
538        self.inner.is_empty()
539    }
540
541    /// Returns a reference to the underlying `RocksDB` provider.
542    pub const fn provider(&self) -> &RocksDBProvider {
543        self.provider
544    }
545
546    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
547    ///
548    /// This is used to defer commits to the provider level.
549    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
550        self.inner
551    }
552}
553
554/// `RocksDB` transaction wrapper providing MDBX-like semantics.
555///
556/// Supports:
557/// - Read-your-writes: reads see uncommitted writes within the same transaction
558/// - Atomic commit/rollback
559/// - Iteration over uncommitted data
560///
561/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
562/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
563pub struct RocksTx<'db> {
564    inner: Transaction<'db, TransactionDB>,
565    provider: &'db RocksDBProvider,
566}
567
568impl fmt::Debug for RocksTx<'_> {
569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
571    }
572}
573
574impl<'db> RocksTx<'db> {
575    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
576    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
577        let encoded_key = key.encode();
578        self.get_encoded::<T>(&encoded_key)
579    }
580
581    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
582    pub fn get_encoded<T: Table>(
583        &self,
584        key: &<T::Key as Encode>::Encoded,
585    ) -> ProviderResult<Option<T::Value>> {
586        let cf = self.provider.get_cf_handle::<T>()?;
587        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
588            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
589                message: e.to_string().into(),
590                code: -1,
591            }))
592        })?;
593
594        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
595    }
596
597    /// Puts a value into the specified table.
598    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
599        let encoded_key = key.encode();
600        self.put_encoded::<T>(&encoded_key, value)
601    }
602
603    /// Puts a value using pre-encoded key.
604    pub fn put_encoded<T: Table>(
605        &self,
606        key: &<T::Key as Encode>::Encoded,
607        value: &T::Value,
608    ) -> ProviderResult<()> {
609        let cf = self.provider.get_cf_handle::<T>()?;
610        let mut buf = Vec::new();
611        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
612
613        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
614            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
615                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
616                operation: DatabaseWriteOperation::PutUpsert,
617                table_name: T::NAME,
618                key: key.as_ref().to_vec(),
619            })))
620        })
621    }
622
623    /// Deletes a value from the specified table.
624    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
625        let cf = self.provider.get_cf_handle::<T>()?;
626        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
627            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
628                message: e.to_string().into(),
629                code: -1,
630            }))
631        })
632    }
633
634    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
635    ///
636    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
637    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
638        let cf = self.provider.get_cf_handle::<T>()?;
639        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
640        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
641    }
642
643    /// Creates an iterator starting from the given key (inclusive).
644    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
645        let cf = self.provider.get_cf_handle::<T>()?;
646        let encoded_key = key.encode();
647        let iter = self
648            .inner
649            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
650        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
651    }
652
653    /// Commits the transaction, persisting all changes.
654    pub fn commit(self) -> ProviderResult<()> {
655        self.inner.commit().map_err(|e| {
656            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
657                message: e.to_string().into(),
658                code: -1,
659            }))
660        })
661    }
662
663    /// Rolls back the transaction, discarding all changes.
664    pub fn rollback(self) -> ProviderResult<()> {
665        self.inner.rollback().map_err(|e| {
666            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
667        })
668    }
669}
670
671/// Iterator over a `RocksDB` table (non-transactional).
672///
673/// Yields decoded `(Key, Value)` pairs in key order.
674pub struct RocksDBIter<'db, T: Table> {
675    inner: rocksdb::DBIteratorWithThreadMode<'db, TransactionDB>,
676    _marker: std::marker::PhantomData<T>,
677}
678
679impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
680    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
682    }
683}
684
685impl<T: Table> Iterator for RocksDBIter<'_, T> {
686    type Item = ProviderResult<(T::Key, T::Value)>;
687
688    fn next(&mut self) -> Option<Self::Item> {
689        let (key_bytes, value_bytes) = match self.inner.next()? {
690            Ok(kv) => kv,
691            Err(e) => {
692                return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
693                    message: e.to_string().into(),
694                    code: -1,
695                }))))
696            }
697        };
698
699        // Decode key
700        let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
701            Ok(k) => k,
702            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
703        };
704
705        // Decompress value
706        let value = match T::Value::decompress(&value_bytes) {
707            Ok(v) => v,
708            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
709        };
710
711        Some(Ok((key, value)))
712    }
713}
714
715/// Iterator over a `RocksDB` table within a transaction.
716///
717/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
718pub struct RocksTxIter<'tx, T: Table> {
719    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, TransactionDB>>,
720    _marker: std::marker::PhantomData<T>,
721}
722
723impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
724    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
725        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
726    }
727}
728
729impl<T: Table> Iterator for RocksTxIter<'_, T> {
730    type Item = ProviderResult<(T::Key, T::Value)>;
731
732    fn next(&mut self) -> Option<Self::Item> {
733        let (key_bytes, value_bytes) = match self.inner.next()? {
734            Ok(kv) => kv,
735            Err(e) => {
736                return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
737                    message: e.to_string().into(),
738                    code: -1,
739                }))))
740            }
741        };
742
743        // Decode key
744        let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
745            Ok(k) => k,
746            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
747        };
748
749        // Decompress value
750        let value = match T::Value::decompress(&value_bytes) {
751            Ok(v) => v,
752            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
753        };
754
755        Some(Ok((key, value)))
756    }
757}
758
759/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
760const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
761    match level {
762        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
763        LogLevel::Error => rocksdb::LogLevel::Error,
764        LogLevel::Warn => rocksdb::LogLevel::Warn,
765        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
766        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use alloy_primitives::{Address, TxHash, B256};
774    use reth_db_api::{
775        models::{sharded_key::ShardedKey, storage_sharded_key::StorageShardedKey, IntegerList},
776        table::Table,
777        tables,
778    };
779    use tempfile::TempDir;
780
781    #[test]
782    fn test_with_default_tables_registers_required_column_families() {
783        let temp_dir = TempDir::new().unwrap();
784
785        // Build with default tables
786        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
787
788        // Should be able to write/read TransactionHashNumbers
789        let tx_hash = TxHash::from(B256::from([1u8; 32]));
790        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
791        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
792
793        // Should be able to write/read AccountsHistory
794        let key = ShardedKey::new(Address::ZERO, 100);
795        let value = IntegerList::default();
796        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
797        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
798
799        // Should be able to write/read StoragesHistory
800        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
801        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
802        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
803    }
804
805    #[derive(Debug)]
806    struct TestTable;
807
808    impl Table for TestTable {
809        const NAME: &'static str = "TestTable";
810        const DUPSORT: bool = false;
811        type Key = u64;
812        type Value = Vec<u8>;
813    }
814
815    #[test]
816    fn test_basic_operations() {
817        let temp_dir = TempDir::new().unwrap();
818
819        let provider = RocksDBBuilder::new(temp_dir.path())
820            .with_table::<TestTable>() // Type-safe!
821            .build()
822            .unwrap();
823
824        let key = 42u64;
825        let value = b"test_value".to_vec();
826
827        // Test write
828        provider.put::<TestTable>(key, &value).unwrap();
829
830        // Test read
831        let result = provider.get::<TestTable>(key).unwrap();
832        assert_eq!(result, Some(value));
833
834        // Test delete
835        provider.delete::<TestTable>(key).unwrap();
836
837        // Verify deletion
838        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
839    }
840
841    #[test]
842    fn test_batch_operations() {
843        let temp_dir = TempDir::new().unwrap();
844        let provider =
845            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
846
847        // Write multiple entries in a batch
848        provider
849            .write_batch(|batch| {
850                for i in 0..10u64 {
851                    let value = format!("value_{i}").into_bytes();
852                    batch.put::<TestTable>(i, &value)?;
853                }
854                Ok(())
855            })
856            .unwrap();
857
858        // Read all entries
859        for i in 0..10u64 {
860            let value = format!("value_{i}").into_bytes();
861            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
862        }
863
864        // Delete all entries in a batch
865        provider
866            .write_batch(|batch| {
867                for i in 0..10u64 {
868                    batch.delete::<TestTable>(i)?;
869                }
870                Ok(())
871            })
872            .unwrap();
873
874        // Verify all deleted
875        for i in 0..10u64 {
876            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
877        }
878    }
879
880    #[test]
881    fn test_with_real_table() {
882        let temp_dir = TempDir::new().unwrap();
883        let provider = RocksDBBuilder::new(temp_dir.path())
884            .with_table::<tables::TransactionHashNumbers>()
885            .with_metrics()
886            .build()
887            .unwrap();
888
889        let tx_hash = TxHash::from(B256::from([1u8; 32]));
890
891        // Insert and retrieve
892        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
893        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
894
895        // Batch insert multiple transactions
896        provider
897            .write_batch(|batch| {
898                for i in 0..10u64 {
899                    let hash = TxHash::from(B256::from([i as u8; 32]));
900                    let value = i * 100;
901                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
902                }
903                Ok(())
904            })
905            .unwrap();
906
907        // Verify batch insertions
908        for i in 0..10u64 {
909            let hash = TxHash::from(B256::from([i as u8; 32]));
910            assert_eq!(
911                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
912                Some(i * 100)
913            );
914        }
915    }
916    #[test]
917    fn test_statistics_enabled() {
918        let temp_dir = TempDir::new().unwrap();
919        // Just verify that building with statistics doesn't panic
920        let provider = RocksDBBuilder::new(temp_dir.path())
921            .with_table::<TestTable>()
922            .with_statistics()
923            .build()
924            .unwrap();
925
926        // Do operations - data should be immediately readable with TransactionDB
927        for i in 0..10 {
928            let value = vec![i as u8];
929            provider.put::<TestTable>(i, &value).unwrap();
930            // Verify write is visible
931            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
932        }
933    }
934
935    #[test]
936    fn test_data_persistence() {
937        let temp_dir = TempDir::new().unwrap();
938        let provider =
939            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
940
941        // Insert data - TransactionDB writes are immediately visible
942        let value = vec![42u8; 1000];
943        for i in 0..100 {
944            provider.put::<TestTable>(i, &value).unwrap();
945        }
946
947        // Verify data is readable
948        for i in 0..100 {
949            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
950        }
951    }
952
953    #[test]
954    fn test_transaction_read_your_writes() {
955        let temp_dir = TempDir::new().unwrap();
956        let provider =
957            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
958
959        // Create a transaction
960        let tx = provider.tx();
961
962        // Write data within the transaction
963        let key = 42u64;
964        let value = b"test_value".to_vec();
965        tx.put::<TestTable>(key, &value).unwrap();
966
967        // Read-your-writes: should see uncommitted data in same transaction
968        let result = tx.get::<TestTable>(key).unwrap();
969        assert_eq!(
970            result,
971            Some(value.clone()),
972            "Transaction should see its own uncommitted writes"
973        );
974
975        // Data should NOT be visible via provider (outside transaction)
976        let provider_result = provider.get::<TestTable>(key).unwrap();
977        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
978
979        // Commit the transaction
980        tx.commit().unwrap();
981
982        // Now data should be visible via provider
983        let committed_result = provider.get::<TestTable>(key).unwrap();
984        assert_eq!(committed_result, Some(value), "Committed data should be visible");
985    }
986
987    #[test]
988    fn test_transaction_rollback() {
989        let temp_dir = TempDir::new().unwrap();
990        let provider =
991            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
992
993        // First, put some initial data
994        let key = 100u64;
995        let initial_value = b"initial".to_vec();
996        provider.put::<TestTable>(key, &initial_value).unwrap();
997
998        // Create a transaction and modify data
999        let tx = provider.tx();
1000        let new_value = b"modified".to_vec();
1001        tx.put::<TestTable>(key, &new_value).unwrap();
1002
1003        // Verify modification is visible within transaction
1004        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
1005
1006        // Rollback instead of commit
1007        tx.rollback().unwrap();
1008
1009        // Data should be unchanged (initial value)
1010        let result = provider.get::<TestTable>(key).unwrap();
1011        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
1012    }
1013
1014    #[test]
1015    fn test_transaction_iterator() {
1016        let temp_dir = TempDir::new().unwrap();
1017        let provider =
1018            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
1019
1020        // Create a transaction
1021        let tx = provider.tx();
1022
1023        // Write multiple entries
1024        for i in 0..5u64 {
1025            let value = format!("value_{i}").into_bytes();
1026            tx.put::<TestTable>(i, &value).unwrap();
1027        }
1028
1029        // Iterate - should see uncommitted writes
1030        let mut count = 0;
1031        for result in tx.iter::<TestTable>().unwrap() {
1032            let (key, value) = result.unwrap();
1033            assert_eq!(value, format!("value_{key}").into_bytes());
1034            count += 1;
1035        }
1036        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
1037
1038        // Commit
1039        tx.commit().unwrap();
1040    }
1041
1042    #[test]
1043    fn test_batch_manual_commit() {
1044        let temp_dir = TempDir::new().unwrap();
1045        let provider =
1046            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
1047
1048        // Create a batch via provider.batch()
1049        let mut batch = provider.batch();
1050
1051        // Add entries
1052        for i in 0..10u64 {
1053            let value = format!("batch_value_{i}").into_bytes();
1054            batch.put::<TestTable>(i, &value).unwrap();
1055        }
1056
1057        // Verify len/is_empty
1058        assert_eq!(batch.len(), 10);
1059        assert!(!batch.is_empty());
1060
1061        // Data should NOT be visible before commit
1062        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
1063
1064        // Commit the batch
1065        batch.commit().unwrap();
1066
1067        // Now data should be visible
1068        for i in 0..10u64 {
1069            let value = format!("batch_value_{i}").into_bytes();
1070            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
1071        }
1072    }
1073
1074    #[test]
1075    fn test_first_and_last_entry() {
1076        let temp_dir = TempDir::new().unwrap();
1077        let provider =
1078            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
1079
1080        // Empty table should return None for both
1081        assert_eq!(provider.first::<TestTable>().unwrap(), None);
1082        assert_eq!(provider.last::<TestTable>().unwrap(), None);
1083
1084        // Insert some entries
1085        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
1086        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
1087        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
1088
1089        // First should return the smallest key
1090        let first = provider.first::<TestTable>().unwrap();
1091        assert_eq!(first, Some((5, b"value_5".to_vec())));
1092
1093        // Last should return the largest key
1094        let last = provider.last::<TestTable>().unwrap();
1095        assert_eq!(last, Some((20, b"value_20".to_vec())));
1096    }
1097}