1use super::metrics::{RocksDBMetrics, RocksDBOperation};
2use reth_db_api::{
3 table::{Compress, Decompress, Encode, Table},
4 tables, DatabaseError,
5};
6use reth_storage_errors::{
7 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
8 provider::{ProviderError, ProviderResult},
9};
10use rocksdb::{
11 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
12 IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions,
13 WriteBatchWithTransaction, WriteOptions,
14};
15use std::{
16 fmt,
17 path::{Path, PathBuf},
18 sync::Arc,
19 time::Instant,
20};
21
22const DEFAULT_CACHE_SIZE: usize = 128 << 20;
24
25const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
27
28const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
30
31const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
33
34const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
36
37const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
41
42pub struct RocksDBBuilder {
44 path: PathBuf,
45 column_families: Vec<String>,
46 enable_metrics: bool,
47 enable_statistics: bool,
48 log_level: rocksdb::LogLevel,
49 block_cache: Cache,
50}
51
52impl fmt::Debug for RocksDBBuilder {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("RocksDBBuilder")
55 .field("path", &self.path)
56 .field("column_families", &self.column_families)
57 .field("enable_metrics", &self.enable_metrics)
58 .finish()
59 }
60}
61
62impl RocksDBBuilder {
63 pub fn new(path: impl AsRef<Path>) -> Self {
65 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
66 Self {
67 path: path.as_ref().to_path_buf(),
68 column_families: Vec::new(),
69 enable_metrics: false,
70 enable_statistics: false,
71 log_level: rocksdb::LogLevel::Info,
72 block_cache: cache,
73 }
74 }
75
76 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
78 let mut table_options = BlockBasedOptions::default();
79 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
80 table_options.set_cache_index_and_filter_blocks(true);
81 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
82 table_options.set_block_cache(cache);
84 table_options.set_bloom_filter(DEFAULT_BLOOM_FILTER_BITS, false);
88 table_options.set_optimize_filters_for_memory(true);
89 table_options
90 }
91
92 fn default_options(
94 log_level: rocksdb::LogLevel,
95 cache: &Cache,
96 enable_statistics: bool,
97 ) -> Options {
98 let table_options = Self::default_table_options(cache);
100
101 let mut options = Options::default();
102 options.set_block_based_table_factory(&table_options);
103 options.create_if_missing(true);
104 options.create_missing_column_families(true);
105 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
106 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
107
108 options.set_bottommost_compression_type(DBCompressionType::Zstd);
109 options.set_bottommost_zstd_max_train_bytes(0, true);
110 options.set_compression_type(DBCompressionType::Lz4);
111 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
112
113 options.set_log_level(log_level);
114
115 if enable_statistics {
117 options.enable_statistics();
118 }
119
120 options
121 }
122
123 fn default_column_family_options(cache: &Cache) -> Options {
125 let table_options = Self::default_table_options(cache);
127
128 let mut cf_options = Options::default();
129 cf_options.set_block_based_table_factory(&table_options);
130 cf_options.set_level_compaction_dynamic_level_bytes(true);
131 cf_options.set_compression_type(DBCompressionType::Lz4);
133 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
134 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
136
137 cf_options
138 }
139
140 pub fn with_table<T: Table>(mut self) -> Self {
142 self.column_families.push(T::NAME.to_string());
143 self
144 }
145
146 pub fn with_default_tables(self) -> Self {
153 self.with_table::<tables::TransactionHashNumbers>()
154 .with_table::<tables::AccountsHistory>()
155 .with_table::<tables::StoragesHistory>()
156 }
157
158 pub const fn with_metrics(mut self) -> Self {
160 self.enable_metrics = true;
161 self
162 }
163
164 pub const fn with_statistics(mut self) -> Self {
166 self.enable_statistics = true;
167 self
168 }
169
170 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
172 if let Some(level) = log_level {
173 self.log_level = convert_log_level(level);
174 }
175 self
176 }
177
178 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
180 self.block_cache = Cache::new_lru_cache(capacity_bytes);
181 self
182 }
183
184 pub fn build(self) -> ProviderResult<RocksDBProvider> {
186 let options =
187 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
188
189 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
190 .column_families
191 .iter()
192 .map(|name| {
193 ColumnFamilyDescriptor::new(
194 name.clone(),
195 Self::default_column_family_options(&self.block_cache),
196 )
197 })
198 .collect();
199
200 let txn_db_options = TransactionDBOptions::default();
202 let db = TransactionDB::open_cf_descriptors(
203 &options,
204 &txn_db_options,
205 &self.path,
206 cf_descriptors,
207 )
208 .map_err(|e| {
209 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
210 message: e.to_string().into(),
211 code: -1,
212 }))
213 })?;
214
215 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
216
217 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner { db, metrics })))
218 }
219}
220
221macro_rules! compress_to_buf_or_ref {
224 ($buf:expr, $value:expr) => {
225 if let Some(value) = $value.uncompressable_ref() {
226 Some(value)
227 } else {
228 $buf.clear();
229 $value.compress_to_buf(&mut $buf);
230 None
231 }
232 };
233}
234
235#[derive(Debug)]
237pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
238
239struct RocksDBProviderInner {
241 db: TransactionDB,
243 metrics: Option<RocksDBMetrics>,
245}
246
247impl fmt::Debug for RocksDBProviderInner {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 f.debug_struct("RocksDBProviderInner")
250 .field("db", &"<TransactionDB>")
251 .field("metrics", &self.metrics)
252 .finish()
253 }
254}
255
256impl Clone for RocksDBProvider {
257 fn clone(&self) -> Self {
258 Self(self.0.clone())
259 }
260}
261
262impl RocksDBProvider {
263 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
265 RocksDBBuilder::new(path).build()
266 }
267
268 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
270 RocksDBBuilder::new(path)
271 }
272
273 pub fn tx(&self) -> RocksTx<'_> {
275 let write_options = WriteOptions::default();
276 let txn_options = TransactionOptions::default();
277 let inner = self.0.db.transaction_opt(&write_options, &txn_options);
278 RocksTx { inner, provider: self }
279 }
280
281 pub fn batch(&self) -> RocksDBBatch<'_> {
286 RocksDBBatch {
287 provider: self,
288 inner: WriteBatchWithTransaction::<true>::default(),
289 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
290 }
291 }
292
293 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
295 self.0
296 .db
297 .cf_handle(T::NAME)
298 .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
299 }
300
301 fn execute_with_operation_metric<T>(
303 &self,
304 operation: RocksDBOperation,
305 table: &'static str,
306 f: impl FnOnce(&Self) -> T,
307 ) -> T {
308 let start = self.0.metrics.as_ref().map(|_| Instant::now());
309 let res = f(self);
310
311 if let (Some(start), Some(metrics)) = (start, &self.0.metrics) {
312 metrics.record_operation(operation, table, start.elapsed());
313 }
314
315 res
316 }
317
318 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
320 self.get_encoded::<T>(&key.encode())
321 }
322
323 pub fn get_encoded<T: Table>(
325 &self,
326 key: &<T::Key as Encode>::Encoded,
327 ) -> ProviderResult<Option<T::Value>> {
328 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
329 let result =
330 this.0.db.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
331 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
332 message: e.to_string().into(),
333 code: -1,
334 }))
335 })?;
336
337 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
338 })
339 }
340
341 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
343 let encoded_key = key.encode();
344 self.put_encoded::<T>(&encoded_key, value)
345 }
346
347 pub fn put_encoded<T: Table>(
349 &self,
350 key: &<T::Key as Encode>::Encoded,
351 value: &T::Value,
352 ) -> ProviderResult<()> {
353 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
354 let mut buf = Vec::new();
358 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
359
360 this.0.db.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
361 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
362 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
363 operation: DatabaseWriteOperation::PutUpsert,
364 table_name: T::NAME,
365 key: key.as_ref().to_vec(),
366 })))
367 })
368 })
369 }
370
371 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
373 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
374 this.0.db.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
375 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
376 message: e.to_string().into(),
377 code: -1,
378 }))
379 })
380 })
381 }
382
383 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
385 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
386 let cf = this.get_cf_handle::<T>()?;
387 let mut iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
388
389 match iter.next() {
390 Some(Ok((key_bytes, value_bytes))) => {
391 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
392 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
393 let value = T::Value::decompress(&value_bytes)
394 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
395 Ok(Some((key, value)))
396 }
397 Some(Err(e)) => {
398 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
399 message: e.to_string().into(),
400 code: -1,
401 })))
402 }
403 None => Ok(None),
404 }
405 })
406 }
407
408 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
410 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
411 let cf = this.get_cf_handle::<T>()?;
412 let mut iter = this.0.db.iterator_cf(cf, IteratorMode::End);
413
414 match iter.next() {
415 Some(Ok((key_bytes, value_bytes))) => {
416 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
417 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
418 let value = T::Value::decompress(&value_bytes)
419 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
420 Ok(Some((key, value)))
421 }
422 Some(Err(e)) => {
423 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
424 message: e.to_string().into(),
425 code: -1,
426 })))
427 }
428 None => Ok(None),
429 }
430 })
431 }
432
433 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
437 let cf = self.get_cf_handle::<T>()?;
438 let iter = self.0.db.iterator_cf(cf, IteratorMode::Start);
439 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
440 }
441
442 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
444 where
445 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
446 {
447 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
448 let mut batch_handle = this.batch();
449 f(&mut batch_handle)?;
450 batch_handle.commit()
451 })
452 }
453
454 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
459 self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
460 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
461 message: e.to_string().into(),
462 code: -1,
463 }))
464 })
465 }
466}
467
468#[must_use = "batch must be committed"]
475pub struct RocksDBBatch<'a> {
476 provider: &'a RocksDBProvider,
477 inner: WriteBatchWithTransaction<true>,
478 buf: Vec<u8>,
479}
480
481impl fmt::Debug for RocksDBBatch<'_> {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483 f.debug_struct("RocksDBBatch")
484 .field("provider", &self.provider)
485 .field("batch", &"<WriteBatchWithTransaction>")
486 .field("length", &self.inner.len())
488 .field("size_in_bytes", &self.inner.size_in_bytes())
491 .finish()
492 }
493}
494
495impl<'a> RocksDBBatch<'a> {
496 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
498 let encoded_key = key.encode();
499 self.put_encoded::<T>(&encoded_key, value)
500 }
501
502 pub fn put_encoded<T: Table>(
504 &mut self,
505 key: &<T::Key as Encode>::Encoded,
506 value: &T::Value,
507 ) -> ProviderResult<()> {
508 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
509 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
510 Ok(())
511 }
512
513 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
515 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
516 Ok(())
517 }
518
519 pub fn commit(self) -> ProviderResult<()> {
523 self.provider.0.db.write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
524 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
525 message: e.to_string().into(),
526 code: -1,
527 }))
528 })
529 }
530
531 pub fn len(&self) -> usize {
533 self.inner.len()
534 }
535
536 pub fn is_empty(&self) -> bool {
538 self.inner.is_empty()
539 }
540
541 pub const fn provider(&self) -> &RocksDBProvider {
543 self.provider
544 }
545
546 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
550 self.inner
551 }
552}
553
554pub struct RocksTx<'db> {
564 inner: Transaction<'db, TransactionDB>,
565 provider: &'db RocksDBProvider,
566}
567
568impl fmt::Debug for RocksTx<'_> {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
571 }
572}
573
574impl<'db> RocksTx<'db> {
575 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
577 let encoded_key = key.encode();
578 self.get_encoded::<T>(&encoded_key)
579 }
580
581 pub fn get_encoded<T: Table>(
583 &self,
584 key: &<T::Key as Encode>::Encoded,
585 ) -> ProviderResult<Option<T::Value>> {
586 let cf = self.provider.get_cf_handle::<T>()?;
587 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
588 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
589 message: e.to_string().into(),
590 code: -1,
591 }))
592 })?;
593
594 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
595 }
596
597 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
599 let encoded_key = key.encode();
600 self.put_encoded::<T>(&encoded_key, value)
601 }
602
603 pub fn put_encoded<T: Table>(
605 &self,
606 key: &<T::Key as Encode>::Encoded,
607 value: &T::Value,
608 ) -> ProviderResult<()> {
609 let cf = self.provider.get_cf_handle::<T>()?;
610 let mut buf = Vec::new();
611 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
612
613 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
614 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
615 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
616 operation: DatabaseWriteOperation::PutUpsert,
617 table_name: T::NAME,
618 key: key.as_ref().to_vec(),
619 })))
620 })
621 }
622
623 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
625 let cf = self.provider.get_cf_handle::<T>()?;
626 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
627 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
628 message: e.to_string().into(),
629 code: -1,
630 }))
631 })
632 }
633
634 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
638 let cf = self.provider.get_cf_handle::<T>()?;
639 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
640 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
641 }
642
643 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
645 let cf = self.provider.get_cf_handle::<T>()?;
646 let encoded_key = key.encode();
647 let iter = self
648 .inner
649 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
650 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
651 }
652
653 pub fn commit(self) -> ProviderResult<()> {
655 self.inner.commit().map_err(|e| {
656 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
657 message: e.to_string().into(),
658 code: -1,
659 }))
660 })
661 }
662
663 pub fn rollback(self) -> ProviderResult<()> {
665 self.inner.rollback().map_err(|e| {
666 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
667 })
668 }
669}
670
671pub struct RocksDBIter<'db, T: Table> {
675 inner: rocksdb::DBIteratorWithThreadMode<'db, TransactionDB>,
676 _marker: std::marker::PhantomData<T>,
677}
678
679impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
680 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
682 }
683}
684
685impl<T: Table> Iterator for RocksDBIter<'_, T> {
686 type Item = ProviderResult<(T::Key, T::Value)>;
687
688 fn next(&mut self) -> Option<Self::Item> {
689 let (key_bytes, value_bytes) = match self.inner.next()? {
690 Ok(kv) => kv,
691 Err(e) => {
692 return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
693 message: e.to_string().into(),
694 code: -1,
695 }))))
696 }
697 };
698
699 let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
701 Ok(k) => k,
702 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
703 };
704
705 let value = match T::Value::decompress(&value_bytes) {
707 Ok(v) => v,
708 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
709 };
710
711 Some(Ok((key, value)))
712 }
713}
714
715pub struct RocksTxIter<'tx, T: Table> {
719 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, TransactionDB>>,
720 _marker: std::marker::PhantomData<T>,
721}
722
723impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
724 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
725 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
726 }
727}
728
729impl<T: Table> Iterator for RocksTxIter<'_, T> {
730 type Item = ProviderResult<(T::Key, T::Value)>;
731
732 fn next(&mut self) -> Option<Self::Item> {
733 let (key_bytes, value_bytes) = match self.inner.next()? {
734 Ok(kv) => kv,
735 Err(e) => {
736 return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
737 message: e.to_string().into(),
738 code: -1,
739 }))))
740 }
741 };
742
743 let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
745 Ok(k) => k,
746 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
747 };
748
749 let value = match T::Value::decompress(&value_bytes) {
751 Ok(v) => v,
752 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
753 };
754
755 Some(Ok((key, value)))
756 }
757}
758
759const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
761 match level {
762 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
763 LogLevel::Error => rocksdb::LogLevel::Error,
764 LogLevel::Warn => rocksdb::LogLevel::Warn,
765 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
766 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use alloy_primitives::{Address, TxHash, B256};
774 use reth_db_api::{
775 models::{sharded_key::ShardedKey, storage_sharded_key::StorageShardedKey, IntegerList},
776 table::Table,
777 tables,
778 };
779 use tempfile::TempDir;
780
781 #[test]
782 fn test_with_default_tables_registers_required_column_families() {
783 let temp_dir = TempDir::new().unwrap();
784
785 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
787
788 let tx_hash = TxHash::from(B256::from([1u8; 32]));
790 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
791 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
792
793 let key = ShardedKey::new(Address::ZERO, 100);
795 let value = IntegerList::default();
796 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
797 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
798
799 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
801 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
802 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
803 }
804
805 #[derive(Debug)]
806 struct TestTable;
807
808 impl Table for TestTable {
809 const NAME: &'static str = "TestTable";
810 const DUPSORT: bool = false;
811 type Key = u64;
812 type Value = Vec<u8>;
813 }
814
815 #[test]
816 fn test_basic_operations() {
817 let temp_dir = TempDir::new().unwrap();
818
819 let provider = RocksDBBuilder::new(temp_dir.path())
820 .with_table::<TestTable>() .build()
822 .unwrap();
823
824 let key = 42u64;
825 let value = b"test_value".to_vec();
826
827 provider.put::<TestTable>(key, &value).unwrap();
829
830 let result = provider.get::<TestTable>(key).unwrap();
832 assert_eq!(result, Some(value));
833
834 provider.delete::<TestTable>(key).unwrap();
836
837 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
839 }
840
841 #[test]
842 fn test_batch_operations() {
843 let temp_dir = TempDir::new().unwrap();
844 let provider =
845 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
846
847 provider
849 .write_batch(|batch| {
850 for i in 0..10u64 {
851 let value = format!("value_{i}").into_bytes();
852 batch.put::<TestTable>(i, &value)?;
853 }
854 Ok(())
855 })
856 .unwrap();
857
858 for i in 0..10u64 {
860 let value = format!("value_{i}").into_bytes();
861 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
862 }
863
864 provider
866 .write_batch(|batch| {
867 for i in 0..10u64 {
868 batch.delete::<TestTable>(i)?;
869 }
870 Ok(())
871 })
872 .unwrap();
873
874 for i in 0..10u64 {
876 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
877 }
878 }
879
880 #[test]
881 fn test_with_real_table() {
882 let temp_dir = TempDir::new().unwrap();
883 let provider = RocksDBBuilder::new(temp_dir.path())
884 .with_table::<tables::TransactionHashNumbers>()
885 .with_metrics()
886 .build()
887 .unwrap();
888
889 let tx_hash = TxHash::from(B256::from([1u8; 32]));
890
891 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
893 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
894
895 provider
897 .write_batch(|batch| {
898 for i in 0..10u64 {
899 let hash = TxHash::from(B256::from([i as u8; 32]));
900 let value = i * 100;
901 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
902 }
903 Ok(())
904 })
905 .unwrap();
906
907 for i in 0..10u64 {
909 let hash = TxHash::from(B256::from([i as u8; 32]));
910 assert_eq!(
911 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
912 Some(i * 100)
913 );
914 }
915 }
916 #[test]
917 fn test_statistics_enabled() {
918 let temp_dir = TempDir::new().unwrap();
919 let provider = RocksDBBuilder::new(temp_dir.path())
921 .with_table::<TestTable>()
922 .with_statistics()
923 .build()
924 .unwrap();
925
926 for i in 0..10 {
928 let value = vec![i as u8];
929 provider.put::<TestTable>(i, &value).unwrap();
930 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
932 }
933 }
934
935 #[test]
936 fn test_data_persistence() {
937 let temp_dir = TempDir::new().unwrap();
938 let provider =
939 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
940
941 let value = vec![42u8; 1000];
943 for i in 0..100 {
944 provider.put::<TestTable>(i, &value).unwrap();
945 }
946
947 for i in 0..100 {
949 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
950 }
951 }
952
953 #[test]
954 fn test_transaction_read_your_writes() {
955 let temp_dir = TempDir::new().unwrap();
956 let provider =
957 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
958
959 let tx = provider.tx();
961
962 let key = 42u64;
964 let value = b"test_value".to_vec();
965 tx.put::<TestTable>(key, &value).unwrap();
966
967 let result = tx.get::<TestTable>(key).unwrap();
969 assert_eq!(
970 result,
971 Some(value.clone()),
972 "Transaction should see its own uncommitted writes"
973 );
974
975 let provider_result = provider.get::<TestTable>(key).unwrap();
977 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
978
979 tx.commit().unwrap();
981
982 let committed_result = provider.get::<TestTable>(key).unwrap();
984 assert_eq!(committed_result, Some(value), "Committed data should be visible");
985 }
986
987 #[test]
988 fn test_transaction_rollback() {
989 let temp_dir = TempDir::new().unwrap();
990 let provider =
991 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
992
993 let key = 100u64;
995 let initial_value = b"initial".to_vec();
996 provider.put::<TestTable>(key, &initial_value).unwrap();
997
998 let tx = provider.tx();
1000 let new_value = b"modified".to_vec();
1001 tx.put::<TestTable>(key, &new_value).unwrap();
1002
1003 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
1005
1006 tx.rollback().unwrap();
1008
1009 let result = provider.get::<TestTable>(key).unwrap();
1011 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
1012 }
1013
1014 #[test]
1015 fn test_transaction_iterator() {
1016 let temp_dir = TempDir::new().unwrap();
1017 let provider =
1018 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
1019
1020 let tx = provider.tx();
1022
1023 for i in 0..5u64 {
1025 let value = format!("value_{i}").into_bytes();
1026 tx.put::<TestTable>(i, &value).unwrap();
1027 }
1028
1029 let mut count = 0;
1031 for result in tx.iter::<TestTable>().unwrap() {
1032 let (key, value) = result.unwrap();
1033 assert_eq!(value, format!("value_{key}").into_bytes());
1034 count += 1;
1035 }
1036 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
1037
1038 tx.commit().unwrap();
1040 }
1041
1042 #[test]
1043 fn test_batch_manual_commit() {
1044 let temp_dir = TempDir::new().unwrap();
1045 let provider =
1046 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
1047
1048 let mut batch = provider.batch();
1050
1051 for i in 0..10u64 {
1053 let value = format!("batch_value_{i}").into_bytes();
1054 batch.put::<TestTable>(i, &value).unwrap();
1055 }
1056
1057 assert_eq!(batch.len(), 10);
1059 assert!(!batch.is_empty());
1060
1061 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
1063
1064 batch.commit().unwrap();
1066
1067 for i in 0..10u64 {
1069 let value = format!("batch_value_{i}").into_bytes();
1070 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
1071 }
1072 }
1073
1074 #[test]
1075 fn test_first_and_last_entry() {
1076 let temp_dir = TempDir::new().unwrap();
1077 let provider =
1078 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
1079
1080 assert_eq!(provider.first::<TestTable>().unwrap(), None);
1082 assert_eq!(provider.last::<TestTable>().unwrap(), None);
1083
1084 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
1086 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
1087 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
1088
1089 let first = provider.first::<TestTable>().unwrap();
1091 assert_eq!(first, Some((5, b"value_5".to_vec())));
1092
1093 let last = provider.last::<TestTable>().unwrap();
1095 assert_eq!(last, Some((20, b"value_20".to_vec())));
1096 }
1097}