1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
5use itertools::Itertools;
6use metrics::Label;
7use parking_lot::Mutex;
8use reth_chain_state::ExecutedBlock;
9use reth_db_api::{
10 database_metrics::DatabaseMetrics,
11 models::{
12 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
13 StorageSettings,
14 },
15 table::{Compress, Decode, Decompress, Encode, Table},
16 tables, BlockNumberList, DatabaseError,
17};
18use reth_primitives_traits::BlockBody as _;
19use reth_prune_types::PruneMode;
20use reth_storage_errors::{
21 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
22 provider::{ProviderError, ProviderResult},
23};
24use rocksdb::{
25 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
26 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
27 OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
28 DB,
29};
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt,
33 path::{Path, PathBuf},
34 sync::Arc,
35 thread,
36 time::Instant,
37};
38use tracing::instrument;
39
40pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
42
43#[derive(Debug, Clone)]
45pub struct RocksDBTableStats {
46 pub sst_size_bytes: u64,
48 pub memtable_size_bytes: u64,
50 pub name: String,
52 pub estimated_num_keys: u64,
54 pub estimated_size_bytes: u64,
56 pub pending_compaction_bytes: u64,
58}
59
60#[derive(Debug, Clone)]
64pub struct RocksDBStats {
65 pub tables: Vec<RocksDBTableStats>,
67 pub wal_size_bytes: u64,
71}
72
73#[derive(Clone)]
75pub(crate) struct RocksDBWriteCtx {
76 pub first_block_number: BlockNumber,
78 pub prune_tx_lookup: Option<PruneMode>,
80 pub storage_settings: StorageSettings,
82 pub pending_batches: PendingRocksDBBatches,
84}
85
86impl fmt::Debug for RocksDBWriteCtx {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("RocksDBWriteCtx")
89 .field("first_block_number", &self.first_block_number)
90 .field("prune_tx_lookup", &self.prune_tx_lookup)
91 .field("storage_settings", &self.storage_settings)
92 .field("pending_batches", &"<pending batches>")
93 .finish()
94 }
95}
96
97const DEFAULT_CACHE_SIZE: usize = 128 << 20;
99
100const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
102
103const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
105
106const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
108
109const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
113
114const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
120
121pub struct RocksDBBuilder {
123 path: PathBuf,
124 column_families: Vec<String>,
125 enable_metrics: bool,
126 enable_statistics: bool,
127 log_level: rocksdb::LogLevel,
128 block_cache: Cache,
129 read_only: bool,
130}
131
132impl fmt::Debug for RocksDBBuilder {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("RocksDBBuilder")
135 .field("path", &self.path)
136 .field("column_families", &self.column_families)
137 .field("enable_metrics", &self.enable_metrics)
138 .finish()
139 }
140}
141
142impl RocksDBBuilder {
143 pub fn new(path: impl AsRef<Path>) -> Self {
145 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
146 Self {
147 path: path.as_ref().to_path_buf(),
148 column_families: Vec::new(),
149 enable_metrics: false,
150 enable_statistics: false,
151 log_level: rocksdb::LogLevel::Info,
152 block_cache: cache,
153 read_only: false,
154 }
155 }
156
157 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
159 let mut table_options = BlockBasedOptions::default();
160 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
161 table_options.set_cache_index_and_filter_blocks(true);
162 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
163 table_options.set_block_cache(cache);
165 table_options
166 }
167
168 fn default_options(
170 log_level: rocksdb::LogLevel,
171 cache: &Cache,
172 enable_statistics: bool,
173 ) -> Options {
174 let table_options = Self::default_table_options(cache);
176
177 let mut options = Options::default();
178 options.set_block_based_table_factory(&table_options);
179 options.create_if_missing(true);
180 options.create_missing_column_families(true);
181 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
182 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
183
184 options.set_bottommost_compression_type(DBCompressionType::Zstd);
185 options.set_bottommost_zstd_max_train_bytes(0, true);
186 options.set_compression_type(DBCompressionType::Lz4);
187 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
188
189 options.set_log_level(log_level);
190
191 options.set_wal_ttl_seconds(0);
194 options.set_wal_size_limit_mb(0);
195
196 if enable_statistics {
198 options.enable_statistics();
199 }
200
201 options
202 }
203
204 fn default_column_family_options(cache: &Cache) -> Options {
206 let table_options = Self::default_table_options(cache);
208
209 let mut cf_options = Options::default();
210 cf_options.set_block_based_table_factory(&table_options);
211 cf_options.set_level_compaction_dynamic_level_bytes(true);
212 cf_options.set_compression_type(DBCompressionType::Lz4);
214 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
215 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
217
218 cf_options
219 }
220
221 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
228 let mut table_options = BlockBasedOptions::default();
229 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
230 table_options.set_cache_index_and_filter_blocks(true);
231 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
232 table_options.set_block_cache(cache);
233 let mut cf_options = Options::default();
237 cf_options.set_block_based_table_factory(&table_options);
238 cf_options.set_level_compaction_dynamic_level_bytes(true);
239 cf_options.set_compression_type(DBCompressionType::None);
242 cf_options.set_bottommost_compression_type(DBCompressionType::None);
243
244 cf_options
245 }
246
247 pub fn with_table<T: Table>(mut self) -> Self {
249 self.column_families.push(T::NAME.to_string());
250 self
251 }
252
253 pub fn with_default_tables(self) -> Self {
260 self.with_table::<tables::TransactionHashNumbers>()
261 .with_table::<tables::AccountsHistory>()
262 .with_table::<tables::StoragesHistory>()
263 }
264
265 pub const fn with_metrics(mut self) -> Self {
267 self.enable_metrics = true;
268 self
269 }
270
271 pub const fn with_statistics(mut self) -> Self {
273 self.enable_statistics = true;
274 self
275 }
276
277 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
279 if let Some(level) = log_level {
280 self.log_level = convert_log_level(level);
281 }
282 self
283 }
284
285 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
287 self.block_cache = Cache::new_lru_cache(capacity_bytes);
288 self
289 }
290
291 pub const fn with_read_only(mut self, read_only: bool) -> Self {
295 self.read_only = read_only;
296 self
297 }
298
299 pub fn build(self) -> ProviderResult<RocksDBProvider> {
301 let options =
302 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
303
304 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
305 .column_families
306 .iter()
307 .map(|name| {
308 let cf_options = if name == tables::TransactionHashNumbers::NAME {
309 Self::tx_hash_numbers_column_family_options(&self.block_cache)
310 } else {
311 Self::default_column_family_options(&self.block_cache)
312 };
313 ColumnFamilyDescriptor::new(name.clone(), cf_options)
314 })
315 .collect();
316
317 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
318
319 if self.read_only {
320 let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
321 .map_err(|e| {
322 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
323 message: e.to_string().into(),
324 code: -1,
325 }))
326 })?;
327 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
328 } else {
329 let db =
334 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
335 .map_err(|e| {
336 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
337 message: e.to_string().into(),
338 code: -1,
339 }))
340 })?;
341 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
342 }
343 }
344}
345
346macro_rules! compress_to_buf_or_ref {
349 ($buf:expr, $value:expr) => {
350 if let Some(value) = $value.uncompressable_ref() {
351 Some(value)
352 } else {
353 $buf.clear();
354 $value.compress_to_buf(&mut $buf);
355 None
356 }
357 };
358}
359
360#[derive(Debug)]
362pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
363
364enum RocksDBProviderInner {
366 ReadWrite {
368 db: OptimisticTransactionDB,
370 metrics: Option<RocksDBMetrics>,
372 },
373 ReadOnly {
376 db: DB,
378 metrics: Option<RocksDBMetrics>,
380 },
381}
382
383impl RocksDBProviderInner {
384 const fn metrics(&self) -> Option<&RocksDBMetrics> {
386 match self {
387 Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
388 }
389 }
390
391 fn db_rw(&self) -> &OptimisticTransactionDB {
393 match self {
394 Self::ReadWrite { db, .. } => db,
395 Self::ReadOnly { .. } => {
396 panic!("Cannot perform write operation on read-only RocksDB provider")
397 }
398 }
399 }
400
401 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
403 let cf = match self {
404 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
405 Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
406 };
407 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
408 }
409
410 fn cf_handle_rw(&self, name: &str) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
415 self.db_rw()
416 .cf_handle(name)
417 .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", name)))
418 }
419
420 fn get_cf(
422 &self,
423 cf: &rocksdb::ColumnFamily,
424 key: impl AsRef<[u8]>,
425 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
426 match self {
427 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
428 Self::ReadOnly { db, .. } => db.get_cf(cf, key),
429 }
430 }
431
432 fn put_cf(
434 &self,
435 cf: &rocksdb::ColumnFamily,
436 key: impl AsRef<[u8]>,
437 value: impl AsRef<[u8]>,
438 ) -> Result<(), rocksdb::Error> {
439 self.db_rw().put_cf(cf, key, value)
440 }
441
442 fn delete_cf(
444 &self,
445 cf: &rocksdb::ColumnFamily,
446 key: impl AsRef<[u8]>,
447 ) -> Result<(), rocksdb::Error> {
448 self.db_rw().delete_cf(cf, key)
449 }
450
451 fn delete_range_cf<K: AsRef<[u8]>>(
453 &self,
454 cf: &rocksdb::ColumnFamily,
455 from: K,
456 to: K,
457 ) -> Result<(), rocksdb::Error> {
458 self.db_rw().delete_range_cf(cf, from, to)
459 }
460
461 fn iterator_cf(
463 &self,
464 cf: &rocksdb::ColumnFamily,
465 mode: IteratorMode<'_>,
466 ) -> RocksDBIterEnum<'_> {
467 match self {
468 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
469 Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
470 }
471 }
472
473 fn path(&self) -> &Path {
475 match self {
476 Self::ReadWrite { db, .. } => db.path(),
477 Self::ReadOnly { db, .. } => db.path(),
478 }
479 }
480
481 fn wal_size_bytes(&self) -> u64 {
485 let path = self.path();
486
487 match std::fs::read_dir(path) {
488 Ok(entries) => entries
489 .filter_map(|e| e.ok())
490 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
491 .filter_map(|e| e.metadata().ok())
492 .map(|m| m.len())
493 .sum(),
494 Err(_) => 0,
495 }
496 }
497
498 fn table_stats(&self) -> Vec<RocksDBTableStats> {
500 let mut stats = Vec::new();
501
502 macro_rules! collect_stats {
503 ($db:expr) => {
504 for cf_name in ROCKSDB_TABLES {
505 if let Some(cf) = $db.cf_handle(cf_name) {
506 let estimated_num_keys = $db
507 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
508 .ok()
509 .flatten()
510 .unwrap_or(0);
511
512 let sst_size = $db
514 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
515 .ok()
516 .flatten()
517 .unwrap_or(0);
518
519 let memtable_size = $db
520 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
521 .ok()
522 .flatten()
523 .unwrap_or(0);
524
525 let estimated_size_bytes = sst_size + memtable_size;
526
527 let pending_compaction_bytes = $db
528 .property_int_value_cf(
529 cf,
530 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
531 )
532 .ok()
533 .flatten()
534 .unwrap_or(0);
535
536 stats.push(RocksDBTableStats {
537 sst_size_bytes: sst_size,
538 memtable_size_bytes: memtable_size,
539 name: cf_name.to_string(),
540 estimated_num_keys,
541 estimated_size_bytes,
542 pending_compaction_bytes,
543 });
544 }
545 }
546 };
547 }
548
549 match self {
550 Self::ReadWrite { db, .. } => collect_stats!(db),
551 Self::ReadOnly { db, .. } => collect_stats!(db),
552 }
553
554 stats
555 }
556
557 fn db_stats(&self) -> RocksDBStats {
559 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
560 }
561}
562
563impl fmt::Debug for RocksDBProviderInner {
564 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565 match self {
566 Self::ReadWrite { metrics, .. } => f
567 .debug_struct("RocksDBProviderInner::ReadWrite")
568 .field("db", &"<OptimisticTransactionDB>")
569 .field("metrics", metrics)
570 .finish(),
571 Self::ReadOnly { metrics, .. } => f
572 .debug_struct("RocksDBProviderInner::ReadOnly")
573 .field("db", &"<DB (read-only)>")
574 .field("metrics", metrics)
575 .finish(),
576 }
577 }
578}
579
580impl Drop for RocksDBProviderInner {
581 fn drop(&mut self) {
582 match self {
583 Self::ReadWrite { db, .. } => {
584 if let Err(e) = db.flush_wal(true) {
587 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
588 }
589 for cf_name in ROCKSDB_TABLES {
590 if let Some(cf) = db.cf_handle(cf_name) &&
591 let Err(e) = db.flush_cf(&cf)
592 {
593 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
594 }
595 }
596 db.cancel_all_background_work(true);
597 }
598 Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
599 }
600 }
601}
602
603impl Clone for RocksDBProvider {
604 fn clone(&self) -> Self {
605 Self(self.0.clone())
606 }
607}
608
609impl DatabaseMetrics for RocksDBProvider {
610 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
611 let mut metrics = Vec::new();
612
613 for stat in self.table_stats() {
614 metrics.push((
615 "rocksdb.table_size",
616 stat.estimated_size_bytes as f64,
617 vec![Label::new("table", stat.name.clone())],
618 ));
619 metrics.push((
620 "rocksdb.table_entries",
621 stat.estimated_num_keys as f64,
622 vec![Label::new("table", stat.name.clone())],
623 ));
624 metrics.push((
625 "rocksdb.pending_compaction_bytes",
626 stat.pending_compaction_bytes as f64,
627 vec![Label::new("table", stat.name.clone())],
628 ));
629 metrics.push((
630 "rocksdb.sst_size",
631 stat.sst_size_bytes as f64,
632 vec![Label::new("table", stat.name.clone())],
633 ));
634 metrics.push((
635 "rocksdb.memtable_size",
636 stat.memtable_size_bytes as f64,
637 vec![Label::new("table", stat.name)],
638 ));
639 }
640
641 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
643
644 metrics
645 }
646}
647
648impl RocksDBProvider {
649 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
651 RocksDBBuilder::new(path).build()
652 }
653
654 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
656 RocksDBBuilder::new(path)
657 }
658
659 pub fn is_read_only(&self) -> bool {
661 matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
662 }
663
664 pub fn tx(&self) -> RocksTx<'_> {
672 let write_options = WriteOptions::default();
673 let txn_options = OptimisticTransactionOptions::default();
674 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
675 RocksTx { inner, provider: self }
676 }
677
678 pub fn batch(&self) -> RocksDBBatch<'_> {
686 RocksDBBatch {
687 provider: self,
688 inner: WriteBatchWithTransaction::<true>::default(),
689 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
690 auto_commit_threshold: None,
691 }
692 }
693
694 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
700 RocksDBBatch {
701 provider: self,
702 inner: WriteBatchWithTransaction::<true>::default(),
703 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
704 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
705 }
706 }
707
708 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
710 self.0.cf_handle::<T>()
711 }
712
713 fn execute_with_operation_metric<R>(
715 &self,
716 operation: RocksDBOperation,
717 table: &'static str,
718 f: impl FnOnce(&Self) -> R,
719 ) -> R {
720 let start = self.0.metrics().map(|_| Instant::now());
721 let res = f(self);
722
723 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
724 metrics.record_operation(operation, table, start.elapsed());
725 }
726
727 res
728 }
729
730 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
732 self.get_encoded::<T>(&key.encode())
733 }
734
735 pub fn get_encoded<T: Table>(
737 &self,
738 key: &<T::Key as Encode>::Encoded,
739 ) -> ProviderResult<Option<T::Value>> {
740 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
741 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
742 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
743 message: e.to_string().into(),
744 code: -1,
745 }))
746 })?;
747
748 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
749 })
750 }
751
752 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
757 let encoded_key = key.encode();
758 self.put_encoded::<T>(&encoded_key, value)
759 }
760
761 pub fn put_encoded<T: Table>(
766 &self,
767 key: &<T::Key as Encode>::Encoded,
768 value: &T::Value,
769 ) -> ProviderResult<()> {
770 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
771 let mut buf = Vec::new();
775 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
776
777 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
778 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
779 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
780 operation: DatabaseWriteOperation::PutUpsert,
781 table_name: T::NAME,
782 key: key.as_ref().to_vec(),
783 })))
784 })
785 })
786 }
787
788 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
793 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
794 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
795 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
796 message: e.to_string().into(),
797 code: -1,
798 }))
799 })
800 })
801 }
802
803 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
809 let cf = self.get_cf_handle::<T>()?;
810
811 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
812 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
813 message: e.to_string().into(),
814 code: -1,
815 }))
816 })?;
817
818 Ok(())
819 }
820
821 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
823 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
824 let cf = this.get_cf_handle::<T>()?;
825 let mut iter = this.0.iterator_cf(cf, IteratorMode::Start);
826
827 match iter.next() {
828 Some(Ok((key_bytes, value_bytes))) => {
829 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
830 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
831 let value = T::Value::decompress(&value_bytes)
832 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
833 Ok(Some((key, value)))
834 }
835 Some(Err(e)) => {
836 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
837 message: e.to_string().into(),
838 code: -1,
839 })))
840 }
841 None => Ok(None),
842 }
843 })
844 }
845
846 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
848 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
849 let cf = this.get_cf_handle::<T>()?;
850 let mut iter = this.0.iterator_cf(cf, IteratorMode::End);
851
852 match iter.next() {
853 Some(Ok((key_bytes, value_bytes))) => {
854 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
855 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
856 let value = T::Value::decompress(&value_bytes)
857 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
858 Ok(Some((key, value)))
859 }
860 Some(Err(e)) => {
861 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
862 message: e.to_string().into(),
863 code: -1,
864 })))
865 }
866 None => Ok(None),
867 }
868 })
869 }
870
871 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
875 let cf = self.get_cf_handle::<T>()?;
876 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
877 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
878 }
879
880 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
884 self.0.table_stats()
885 }
886
887 pub fn wal_size_bytes(&self) -> u64 {
893 self.0.wal_size_bytes()
894 }
895
896 pub fn db_stats(&self) -> RocksDBStats {
900 self.0.db_stats()
901 }
902
903 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
914 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
915 let db = self.0.db_rw();
916
917 for cf_name in tables {
918 if let Some(cf) = db.cf_handle(cf_name) {
919 db.flush_cf(&cf).map_err(|e| {
920 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
921 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
922 operation: DatabaseWriteOperation::Flush,
923 table_name: cf_name,
924 key: Vec::new(),
925 })))
926 })?;
927 }
928 }
929
930 db.flush_wal(true).map_err(|e| {
931 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
932 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
933 operation: DatabaseWriteOperation::Flush,
934 table_name: "WAL",
935 key: Vec::new(),
936 })))
937 })?;
938
939 Ok(())
940 }
941
942 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
946 let cf = self.get_cf_handle::<T>()?;
947 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
948 Ok(RocksDBRawIter { inner: iter })
949 }
950
951 pub fn account_history_shards(
956 &self,
957 address: Address,
958 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
959 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
961
962 let start_key = ShardedKey::new(address, 0u64);
965 let start_bytes = start_key.encode();
966
967 let iter = self
969 .0
970 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
971
972 let mut result = Vec::new();
973 for item in iter {
974 match item {
975 Ok((key_bytes, value_bytes)) => {
976 let key = ShardedKey::<Address>::decode(&key_bytes)
978 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
979
980 if key.key != address {
982 break;
983 }
984
985 let value = BlockNumberList::decompress(&value_bytes)
987 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
988
989 result.push((key, value));
990 }
991 Err(e) => {
992 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
993 message: e.to_string().into(),
994 code: -1,
995 })));
996 }
997 }
998 }
999
1000 Ok(result)
1001 }
1002
1003 pub fn storage_history_shards(
1008 &self,
1009 address: Address,
1010 storage_key: B256,
1011 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1012 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1013
1014 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1015 let start_bytes = start_key.encode();
1016
1017 let iter = self
1018 .0
1019 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1020
1021 let mut result = Vec::new();
1022 for item in iter {
1023 match item {
1024 Ok((key_bytes, value_bytes)) => {
1025 let key = StorageShardedKey::decode(&key_bytes)
1026 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1027
1028 if key.address != address || key.sharded_key.key != storage_key {
1029 break;
1030 }
1031
1032 let value = BlockNumberList::decompress(&value_bytes)
1033 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1034
1035 result.push((key, value));
1036 }
1037 Err(e) => {
1038 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1039 message: e.to_string().into(),
1040 code: -1,
1041 })));
1042 }
1043 }
1044 }
1045
1046 Ok(result)
1047 }
1048
1049 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1057 pub fn unwind_account_history_indices(
1058 &self,
1059 last_indices: &[(Address, BlockNumber)],
1060 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1061 let mut address_min_block: HashMap<Address, BlockNumber> =
1062 HashMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1063 for &(address, block_number) in last_indices {
1064 address_min_block
1065 .entry(address)
1066 .and_modify(|min| *min = (*min).min(block_number))
1067 .or_insert(block_number);
1068 }
1069
1070 let mut batch = self.batch();
1071 for (address, min_block) in address_min_block {
1072 match min_block.checked_sub(1) {
1073 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1074 None => batch.clear_account_history(address)?,
1075 }
1076 }
1077
1078 Ok(batch.into_inner())
1079 }
1080
1081 pub fn unwind_storage_history_indices(
1089 &self,
1090 storage_changesets: &[(Address, B256, BlockNumber)],
1091 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1092 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1093 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1094 for &(address, storage_key, block_number) in storage_changesets {
1095 key_min_block
1096 .entry((address, storage_key))
1097 .and_modify(|min| *min = (*min).min(block_number))
1098 .or_insert(block_number);
1099 }
1100
1101 let mut batch = self.batch();
1102 for ((address, storage_key), min_block) in key_min_block {
1103 match min_block.checked_sub(1) {
1104 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1105 None => batch.clear_storage_history(address, storage_key)?,
1106 }
1107 }
1108
1109 Ok(batch.into_inner())
1110 }
1111
1112 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1114 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1115 where
1116 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1117 {
1118 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1119 let mut batch_handle = this.batch();
1120 f(&mut batch_handle)?;
1121 batch_handle.commit()
1122 })
1123 }
1124
1125 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1133 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1134 self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1135 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1136 message: e.to_string().into(),
1137 code: -1,
1138 }))
1139 })
1140 }
1141
1142 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1148 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1149 &self,
1150 blocks: &[ExecutedBlock<N>],
1151 tx_nums: &[TxNumber],
1152 ctx: RocksDBWriteCtx,
1153 ) -> ProviderResult<()> {
1154 if !ctx.storage_settings.any_in_rocksdb() {
1155 return Ok(());
1156 }
1157
1158 thread::scope(|s| {
1159 let handles: Vec<_> = [
1160 (ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
1161 ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
1162 .then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
1163 ctx.storage_settings
1164 .account_history_in_rocksdb
1165 .then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
1166 ctx.storage_settings
1167 .storages_history_in_rocksdb
1168 .then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
1169 ]
1170 .into_iter()
1171 .enumerate()
1172 .filter_map(|(i, h)| h.map(|h| (i, h)))
1173 .collect();
1174
1175 for (i, handle) in handles {
1176 handle.join().map_err(|_| {
1177 ProviderError::Database(DatabaseError::Other(format!(
1178 "rocksdb write thread {i} panicked"
1179 )))
1180 })??;
1181 }
1182
1183 Ok(())
1184 })
1185 }
1186
1187 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1189 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1190 &self,
1191 blocks: &[ExecutedBlock<N>],
1192 tx_nums: &[TxNumber],
1193 ctx: &RocksDBWriteCtx,
1194 ) -> ProviderResult<()> {
1195 let mut batch = self.batch();
1196 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1197 let body = block.recovered_block().body();
1198 let mut tx_num = first_tx_num;
1199 for transaction in body.transactions_iter() {
1200 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1201 tx_num += 1;
1202 }
1203 }
1204 ctx.pending_batches.lock().push(batch.into_inner());
1205 Ok(())
1206 }
1207
1208 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1210 fn write_account_history<N: reth_node_types::NodePrimitives>(
1211 &self,
1212 blocks: &[ExecutedBlock<N>],
1213 ctx: &RocksDBWriteCtx,
1214 ) -> ProviderResult<()> {
1215 let mut batch = self.batch();
1216 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1217 for (block_idx, block) in blocks.iter().enumerate() {
1218 let block_number = ctx.first_block_number + block_idx as u64;
1219 let bundle = &block.execution_outcome().state;
1220 for &address in bundle.state().keys() {
1221 account_history.entry(address).or_default().push(block_number);
1222 }
1223 }
1224
1225 for (address, indices) in account_history {
1227 batch.append_account_history_shard(address, indices)?;
1228 }
1229 ctx.pending_batches.lock().push(batch.into_inner());
1230 Ok(())
1231 }
1232
1233 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1235 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1236 &self,
1237 blocks: &[ExecutedBlock<N>],
1238 ctx: &RocksDBWriteCtx,
1239 ) -> ProviderResult<()> {
1240 let mut batch = self.batch();
1241 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1242 for (block_idx, block) in blocks.iter().enumerate() {
1243 let block_number = ctx.first_block_number + block_idx as u64;
1244 let bundle = &block.execution_outcome().state;
1245 for (&address, account) in bundle.state() {
1246 for &slot in account.storage.keys() {
1247 let key = B256::new(slot.to_be_bytes());
1248 storage_history.entry((address, key)).or_default().push(block_number);
1249 }
1250 }
1251 }
1252
1253 for ((address, slot), indices) in storage_history {
1255 batch.append_storage_history_shard(address, slot, indices)?;
1256 }
1257 ctx.pending_batches.lock().push(batch.into_inner());
1258 Ok(())
1259 }
1260}
1261
1262#[must_use = "batch must be committed"]
1272pub struct RocksDBBatch<'a> {
1273 provider: &'a RocksDBProvider,
1274 inner: WriteBatchWithTransaction<true>,
1275 buf: Vec<u8>,
1276 auto_commit_threshold: Option<usize>,
1278}
1279
1280impl fmt::Debug for RocksDBBatch<'_> {
1281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1282 f.debug_struct("RocksDBBatch")
1283 .field("provider", &self.provider)
1284 .field("batch", &"<WriteBatchWithTransaction>")
1285 .field("length", &self.inner.len())
1287 .field("size_in_bytes", &self.inner.size_in_bytes())
1290 .finish()
1291 }
1292}
1293
1294impl<'a> RocksDBBatch<'a> {
1295 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1299 let encoded_key = key.encode();
1300 self.put_encoded::<T>(&encoded_key, value)
1301 }
1302
1303 pub fn put_encoded<T: Table>(
1307 &mut self,
1308 key: &<T::Key as Encode>::Encoded,
1309 value: &T::Value,
1310 ) -> ProviderResult<()> {
1311 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1312 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1313 self.maybe_auto_commit()?;
1314 Ok(())
1315 }
1316
1317 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1321 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1322 self.maybe_auto_commit()?;
1323 Ok(())
1324 }
1325
1326 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1331 if let Some(threshold) = self.auto_commit_threshold &&
1332 self.inner.size_in_bytes() >= threshold
1333 {
1334 tracing::debug!(
1335 target: "providers::rocksdb",
1336 batch_size = self.inner.size_in_bytes(),
1337 threshold,
1338 "Auto-committing RocksDB batch"
1339 );
1340 let old_batch = std::mem::take(&mut self.inner);
1341 self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1342 |e| {
1343 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1344 message: e.to_string().into(),
1345 code: -1,
1346 }))
1347 },
1348 )?;
1349 }
1350 Ok(())
1351 }
1352
1353 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1360 pub fn commit(self) -> ProviderResult<()> {
1361 self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1362 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1363 message: e.to_string().into(),
1364 code: -1,
1365 }))
1366 })
1367 }
1368
1369 pub fn len(&self) -> usize {
1371 self.inner.len()
1372 }
1373
1374 pub fn is_empty(&self) -> bool {
1376 self.inner.is_empty()
1377 }
1378
1379 pub fn size_in_bytes(&self) -> usize {
1381 self.inner.size_in_bytes()
1382 }
1383
1384 pub const fn provider(&self) -> &RocksDBProvider {
1386 self.provider
1387 }
1388
1389 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1393 self.inner
1394 }
1395
1396 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1401 self.provider.get::<T>(key)
1402 }
1403
1404 pub fn append_account_history_shard(
1416 &mut self,
1417 address: Address,
1418 indices: impl IntoIterator<Item = u64>,
1419 ) -> ProviderResult<()> {
1420 let indices: Vec<u64> = indices.into_iter().collect();
1421
1422 if indices.is_empty() {
1423 return Ok(());
1424 }
1425
1426 debug_assert!(
1427 indices.windows(2).all(|w| w[0] < w[1]),
1428 "indices must be strictly increasing: {:?}",
1429 indices
1430 );
1431
1432 let last_key = ShardedKey::new(address, u64::MAX);
1433 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1434 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1435
1436 last_shard.append(indices).map_err(ProviderError::other)?;
1437
1438 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1440 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1441 return Ok(());
1442 }
1443
1444 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1446 let mut chunks_peekable = chunks.into_iter().peekable();
1447
1448 while let Some(chunk) = chunks_peekable.next() {
1449 let shard = BlockNumberList::new_pre_sorted(chunk);
1450 let highest_block_number = if chunks_peekable.peek().is_some() {
1451 shard.iter().next_back().expect("`chunks` does not return empty list")
1452 } else {
1453 u64::MAX
1454 };
1455
1456 self.put::<tables::AccountsHistory>(
1457 ShardedKey::new(address, highest_block_number),
1458 &shard,
1459 )?;
1460 }
1461
1462 Ok(())
1463 }
1464
1465 pub fn append_storage_history_shard(
1477 &mut self,
1478 address: Address,
1479 storage_key: B256,
1480 indices: impl IntoIterator<Item = u64>,
1481 ) -> ProviderResult<()> {
1482 let indices: Vec<u64> = indices.into_iter().collect();
1483
1484 if indices.is_empty() {
1485 return Ok(());
1486 }
1487
1488 debug_assert!(
1489 indices.windows(2).all(|w| w[0] < w[1]),
1490 "indices must be strictly increasing: {:?}",
1491 indices
1492 );
1493
1494 let last_key = StorageShardedKey::last(address, storage_key);
1495 let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1496 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1497
1498 last_shard.append(indices).map_err(ProviderError::other)?;
1499
1500 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1502 self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1503 return Ok(());
1504 }
1505
1506 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1508 let mut chunks_peekable = chunks.into_iter().peekable();
1509
1510 while let Some(chunk) = chunks_peekable.next() {
1511 let shard = BlockNumberList::new_pre_sorted(chunk);
1512 let highest_block_number = if chunks_peekable.peek().is_some() {
1513 shard.iter().next_back().expect("`chunks` does not return empty list")
1514 } else {
1515 u64::MAX
1516 };
1517
1518 self.put::<tables::StoragesHistory>(
1519 StorageShardedKey::new(address, storage_key, highest_block_number),
1520 &shard,
1521 )?;
1522 }
1523
1524 Ok(())
1525 }
1526
1527 pub fn unwind_account_history_to(
1534 &mut self,
1535 address: Address,
1536 keep_to: BlockNumber,
1537 ) -> ProviderResult<()> {
1538 let shards = self.provider.account_history_shards(address)?;
1539 if shards.is_empty() {
1540 return Ok(());
1541 }
1542
1543 let boundary_idx = shards.iter().position(|(key, _)| {
1546 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1547 });
1548
1549 let Some(boundary_idx) = boundary_idx else {
1551 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1552 if last_key.highest_block_number != u64::MAX {
1553 self.delete::<tables::AccountsHistory>(last_key.clone())?;
1554 self.put::<tables::AccountsHistory>(
1555 ShardedKey::new(address, u64::MAX),
1556 last_value,
1557 )?;
1558 }
1559 return Ok(());
1560 };
1561
1562 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1564 self.delete::<tables::AccountsHistory>(key.clone())?;
1565 }
1566
1567 let (boundary_key, boundary_list) = &shards[boundary_idx];
1569
1570 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1572
1573 let new_last =
1575 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1576
1577 if new_last.is_empty() {
1578 if boundary_idx == 0 {
1581 return Ok(());
1583 }
1584
1585 let (prev_key, prev_value) = &shards[boundary_idx - 1];
1586 if prev_key.highest_block_number != u64::MAX {
1587 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1588 self.put::<tables::AccountsHistory>(
1589 ShardedKey::new(address, u64::MAX),
1590 prev_value,
1591 )?;
1592 }
1593 return Ok(());
1594 }
1595
1596 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1597
1598 Ok(())
1599 }
1600
1601 pub fn unwind_storage_history_to(
1610 &mut self,
1611 address: Address,
1612 storage_key: B256,
1613 keep_to: BlockNumber,
1614 ) -> ProviderResult<()> {
1615 let shards = self.provider.storage_history_shards(address, storage_key)?;
1616 if shards.is_empty() {
1617 return Ok(());
1618 }
1619
1620 let boundary_idx = shards.iter().position(|(key, _)| {
1623 key.sharded_key.highest_block_number == u64::MAX ||
1624 key.sharded_key.highest_block_number > keep_to
1625 });
1626
1627 let Some(boundary_idx) = boundary_idx else {
1629 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1630 if last_key.sharded_key.highest_block_number != u64::MAX {
1631 self.delete::<tables::StoragesHistory>(last_key.clone())?;
1632 self.put::<tables::StoragesHistory>(
1633 StorageShardedKey::last(address, storage_key),
1634 last_value,
1635 )?;
1636 }
1637 return Ok(());
1638 };
1639
1640 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1642 self.delete::<tables::StoragesHistory>(key.clone())?;
1643 }
1644
1645 let (boundary_key, boundary_list) = &shards[boundary_idx];
1647
1648 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
1650
1651 let new_last =
1653 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1654
1655 if new_last.is_empty() {
1656 if boundary_idx == 0 {
1659 return Ok(());
1661 }
1662
1663 let (prev_key, prev_value) = &shards[boundary_idx - 1];
1664 if prev_key.sharded_key.highest_block_number != u64::MAX {
1665 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
1666 self.put::<tables::StoragesHistory>(
1667 StorageShardedKey::last(address, storage_key),
1668 prev_value,
1669 )?;
1670 }
1671 return Ok(());
1672 }
1673
1674 self.put::<tables::StoragesHistory>(
1675 StorageShardedKey::last(address, storage_key),
1676 &new_last,
1677 )?;
1678
1679 Ok(())
1680 }
1681
1682 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
1686 let shards = self.provider.account_history_shards(address)?;
1687 for (key, _) in shards {
1688 self.delete::<tables::AccountsHistory>(key)?;
1689 }
1690 Ok(())
1691 }
1692
1693 pub fn clear_storage_history(
1697 &mut self,
1698 address: Address,
1699 storage_key: B256,
1700 ) -> ProviderResult<()> {
1701 let shards = self.provider.storage_history_shards(address, storage_key)?;
1702 for (key, _) in shards {
1703 self.delete::<tables::StoragesHistory>(key)?;
1704 }
1705 Ok(())
1706 }
1707}
1708
1709pub struct RocksTx<'db> {
1719 inner: Transaction<'db, OptimisticTransactionDB>,
1720 provider: &'db RocksDBProvider,
1721}
1722
1723impl fmt::Debug for RocksTx<'_> {
1724 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1725 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
1726 }
1727}
1728
1729impl<'db> RocksTx<'db> {
1730 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1732 let encoded_key = key.encode();
1733 self.get_encoded::<T>(&encoded_key)
1734 }
1735
1736 pub fn get_encoded<T: Table>(
1738 &self,
1739 key: &<T::Key as Encode>::Encoded,
1740 ) -> ProviderResult<Option<T::Value>> {
1741 let cf = self.provider.get_cf_handle::<T>()?;
1742 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
1743 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1744 message: e.to_string().into(),
1745 code: -1,
1746 }))
1747 })?;
1748
1749 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1750 }
1751
1752 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1754 let encoded_key = key.encode();
1755 self.put_encoded::<T>(&encoded_key, value)
1756 }
1757
1758 pub fn put_encoded<T: Table>(
1760 &self,
1761 key: &<T::Key as Encode>::Encoded,
1762 value: &T::Value,
1763 ) -> ProviderResult<()> {
1764 let cf = self.provider.get_cf_handle::<T>()?;
1765 let mut buf = Vec::new();
1766 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
1767
1768 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
1769 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1770 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1771 operation: DatabaseWriteOperation::PutUpsert,
1772 table_name: T::NAME,
1773 key: key.as_ref().to_vec(),
1774 })))
1775 })
1776 }
1777
1778 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
1780 let cf = self.provider.get_cf_handle::<T>()?;
1781 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
1782 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
1783 message: e.to_string().into(),
1784 code: -1,
1785 }))
1786 })
1787 }
1788
1789 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
1793 let cf = self.provider.get_cf_handle::<T>()?;
1794 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
1795 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
1796 }
1797
1798 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
1800 let cf = self.provider.get_cf_handle::<T>()?;
1801 let encoded_key = key.encode();
1802 let iter = self
1803 .inner
1804 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
1805 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
1806 }
1807
1808 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1810 pub fn commit(self) -> ProviderResult<()> {
1811 self.inner.commit().map_err(|e| {
1812 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1813 message: e.to_string().into(),
1814 code: -1,
1815 }))
1816 })
1817 }
1818
1819 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1821 pub fn rollback(self) -> ProviderResult<()> {
1822 self.inner.rollback().map_err(|e| {
1823 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
1824 })
1825 }
1826
1827 pub fn account_history_info(
1833 &self,
1834 address: Address,
1835 block_number: BlockNumber,
1836 lowest_available_block_number: Option<BlockNumber>,
1837 ) -> ProviderResult<HistoryInfo> {
1838 let key = ShardedKey::new(address, block_number);
1839 self.history_info::<tables::AccountsHistory>(
1840 key.encode().as_ref(),
1841 block_number,
1842 lowest_available_block_number,
1843 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1844 |prev_bytes| {
1845 <ShardedKey<Address> as Decode>::decode(prev_bytes)
1846 .map(|k| k.key == address)
1847 .unwrap_or(false)
1848 },
1849 )
1850 }
1851
1852 pub fn storage_history_info(
1858 &self,
1859 address: Address,
1860 storage_key: B256,
1861 block_number: BlockNumber,
1862 lowest_available_block_number: Option<BlockNumber>,
1863 ) -> ProviderResult<HistoryInfo> {
1864 let key = StorageShardedKey::new(address, storage_key, block_number);
1865 self.history_info::<tables::StoragesHistory>(
1866 key.encode().as_ref(),
1867 block_number,
1868 lowest_available_block_number,
1869 |key_bytes| {
1870 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1871 Ok(k.address == address && k.sharded_key.key == storage_key)
1872 },
1873 |prev_bytes| {
1874 <StorageShardedKey as Decode>::decode(prev_bytes)
1875 .map(|k| k.address == address && k.sharded_key.key == storage_key)
1876 .unwrap_or(false)
1877 },
1878 )
1879 }
1880
1881 fn history_info<T>(
1886 &self,
1887 encoded_key: &[u8],
1888 block_number: BlockNumber,
1889 lowest_available_block_number: Option<BlockNumber>,
1890 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1891 prev_key_matches: impl Fn(&[u8]) -> bool,
1892 ) -> ProviderResult<HistoryInfo>
1893 where
1894 T: Table<Value = BlockNumberList>,
1895 {
1896 let is_maybe_pruned = lowest_available_block_number.is_some();
1898 let fallback = || {
1899 Ok(if is_maybe_pruned {
1900 HistoryInfo::MaybeInPlainState
1901 } else {
1902 HistoryInfo::NotYetWritten
1903 })
1904 };
1905
1906 let cf = self.provider.0.cf_handle_rw(T::NAME)?;
1907
1908 let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
1910 self.inner.raw_iterator_cf(&cf);
1911
1912 iter.seek(encoded_key);
1914 Self::raw_iter_status_ok(&iter)?;
1915
1916 if !iter.valid() {
1917 return fallback();
1923 }
1924
1925 let Some(key_bytes) = iter.key() else {
1927 return fallback();
1928 };
1929 if !key_matches(key_bytes)? {
1930 return fallback();
1932 }
1933
1934 let Some(value_bytes) = iter.value() else {
1936 return fallback();
1937 };
1938 let chunk = BlockNumberList::decompress(value_bytes)?;
1939 let (rank, found_block) = compute_history_rank(&chunk, block_number);
1940
1941 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1945 iter.prev();
1946 Self::raw_iter_status_ok(&iter)?;
1947 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1948 !has_prev
1949 } else {
1950 false
1951 };
1952
1953 Ok(HistoryInfo::from_lookup(
1954 found_block,
1955 is_before_first_write,
1956 lowest_available_block_number,
1957 ))
1958 }
1959
1960 fn raw_iter_status_ok(
1962 iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
1963 ) -> ProviderResult<()> {
1964 iter.status().map_err(|e| {
1965 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1966 message: e.to_string().into(),
1967 code: -1,
1968 }))
1969 })
1970 }
1971}
1972
1973enum RocksDBIterEnum<'db> {
1975 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
1977 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
1979}
1980
1981impl Iterator for RocksDBIterEnum<'_> {
1982 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
1983
1984 fn next(&mut self) -> Option<Self::Item> {
1985 match self {
1986 Self::ReadWrite(iter) => iter.next(),
1987 Self::ReadOnly(iter) => iter.next(),
1988 }
1989 }
1990}
1991
1992pub struct RocksDBIter<'db, T: Table> {
1996 inner: RocksDBIterEnum<'db>,
1997 _marker: std::marker::PhantomData<T>,
1998}
1999
2000impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2001 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2002 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2003 }
2004}
2005
2006impl<T: Table> Iterator for RocksDBIter<'_, T> {
2007 type Item = ProviderResult<(T::Key, T::Value)>;
2008
2009 fn next(&mut self) -> Option<Self::Item> {
2010 let (key_bytes, value_bytes) = match self.inner.next()? {
2011 Ok(kv) => kv,
2012 Err(e) => {
2013 return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2014 message: e.to_string().into(),
2015 code: -1,
2016 }))))
2017 }
2018 };
2019
2020 let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
2022 Ok(k) => k,
2023 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2024 };
2025
2026 let value = match T::Value::decompress(&value_bytes) {
2028 Ok(v) => v,
2029 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2030 };
2031
2032 Some(Ok((key, value)))
2033 }
2034}
2035
2036pub struct RocksDBRawIter<'db> {
2040 inner: RocksDBIterEnum<'db>,
2041}
2042
2043impl fmt::Debug for RocksDBRawIter<'_> {
2044 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2045 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2046 }
2047}
2048
2049impl Iterator for RocksDBRawIter<'_> {
2050 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2051
2052 fn next(&mut self) -> Option<Self::Item> {
2053 match self.inner.next()? {
2054 Ok(kv) => Some(Ok(kv)),
2055 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2056 message: e.to_string().into(),
2057 code: -1,
2058 })))),
2059 }
2060 }
2061}
2062
2063pub struct RocksTxIter<'tx, T: Table> {
2067 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2068 _marker: std::marker::PhantomData<T>,
2069}
2070
2071impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2072 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2073 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2074 }
2075}
2076
2077impl<T: Table> Iterator for RocksTxIter<'_, T> {
2078 type Item = ProviderResult<(T::Key, T::Value)>;
2079
2080 fn next(&mut self) -> Option<Self::Item> {
2081 let (key_bytes, value_bytes) = match self.inner.next()? {
2082 Ok(kv) => kv,
2083 Err(e) => {
2084 return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2085 message: e.to_string().into(),
2086 code: -1,
2087 }))))
2088 }
2089 };
2090
2091 let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
2093 Ok(k) => k,
2094 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2095 };
2096
2097 let value = match T::Value::decompress(&value_bytes) {
2099 Ok(v) => v,
2100 Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2101 };
2102
2103 Some(Ok((key, value)))
2104 }
2105}
2106
2107const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2109 match level {
2110 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2111 LogLevel::Error => rocksdb::LogLevel::Error,
2112 LogLevel::Warn => rocksdb::LogLevel::Warn,
2113 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2114 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2115 }
2116}
2117
2118#[cfg(test)]
2119mod tests {
2120 use super::*;
2121 use crate::providers::HistoryInfo;
2122 use alloy_primitives::{Address, TxHash, B256};
2123 use reth_db_api::{
2124 models::{
2125 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2126 storage_sharded_key::StorageShardedKey,
2127 IntegerList,
2128 },
2129 table::Table,
2130 tables,
2131 };
2132 use tempfile::TempDir;
2133
2134 #[test]
2135 fn test_with_default_tables_registers_required_column_families() {
2136 let temp_dir = TempDir::new().unwrap();
2137
2138 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2140
2141 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2143 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2144 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2145
2146 let key = ShardedKey::new(Address::ZERO, 100);
2148 let value = IntegerList::default();
2149 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2150 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2151
2152 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2154 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2155 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2156 }
2157
2158 #[derive(Debug)]
2159 struct TestTable;
2160
2161 impl Table for TestTable {
2162 const NAME: &'static str = "TestTable";
2163 const DUPSORT: bool = false;
2164 type Key = u64;
2165 type Value = Vec<u8>;
2166 }
2167
2168 #[test]
2169 fn test_basic_operations() {
2170 let temp_dir = TempDir::new().unwrap();
2171
2172 let provider = RocksDBBuilder::new(temp_dir.path())
2173 .with_table::<TestTable>() .build()
2175 .unwrap();
2176
2177 let key = 42u64;
2178 let value = b"test_value".to_vec();
2179
2180 provider.put::<TestTable>(key, &value).unwrap();
2182
2183 let result = provider.get::<TestTable>(key).unwrap();
2185 assert_eq!(result, Some(value));
2186
2187 provider.delete::<TestTable>(key).unwrap();
2189
2190 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2192 }
2193
2194 #[test]
2195 fn test_batch_operations() {
2196 let temp_dir = TempDir::new().unwrap();
2197 let provider =
2198 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2199
2200 provider
2202 .write_batch(|batch| {
2203 for i in 0..10u64 {
2204 let value = format!("value_{i}").into_bytes();
2205 batch.put::<TestTable>(i, &value)?;
2206 }
2207 Ok(())
2208 })
2209 .unwrap();
2210
2211 for i in 0..10u64 {
2213 let value = format!("value_{i}").into_bytes();
2214 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2215 }
2216
2217 provider
2219 .write_batch(|batch| {
2220 for i in 0..10u64 {
2221 batch.delete::<TestTable>(i)?;
2222 }
2223 Ok(())
2224 })
2225 .unwrap();
2226
2227 for i in 0..10u64 {
2229 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2230 }
2231 }
2232
2233 #[test]
2234 fn test_with_real_table() {
2235 let temp_dir = TempDir::new().unwrap();
2236 let provider = RocksDBBuilder::new(temp_dir.path())
2237 .with_table::<tables::TransactionHashNumbers>()
2238 .with_metrics()
2239 .build()
2240 .unwrap();
2241
2242 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2243
2244 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2246 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2247
2248 provider
2250 .write_batch(|batch| {
2251 for i in 0..10u64 {
2252 let hash = TxHash::from(B256::from([i as u8; 32]));
2253 let value = i * 100;
2254 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2255 }
2256 Ok(())
2257 })
2258 .unwrap();
2259
2260 for i in 0..10u64 {
2262 let hash = TxHash::from(B256::from([i as u8; 32]));
2263 assert_eq!(
2264 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2265 Some(i * 100)
2266 );
2267 }
2268 }
2269 #[test]
2270 fn test_statistics_enabled() {
2271 let temp_dir = TempDir::new().unwrap();
2272 let provider = RocksDBBuilder::new(temp_dir.path())
2274 .with_table::<TestTable>()
2275 .with_statistics()
2276 .build()
2277 .unwrap();
2278
2279 for i in 0..10 {
2281 let value = vec![i as u8];
2282 provider.put::<TestTable>(i, &value).unwrap();
2283 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2285 }
2286 }
2287
2288 #[test]
2289 fn test_data_persistence() {
2290 let temp_dir = TempDir::new().unwrap();
2291 let provider =
2292 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2293
2294 let value = vec![42u8; 1000];
2296 for i in 0..100 {
2297 provider.put::<TestTable>(i, &value).unwrap();
2298 }
2299
2300 for i in 0..100 {
2302 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2303 }
2304 }
2305
2306 #[test]
2307 fn test_transaction_read_your_writes() {
2308 let temp_dir = TempDir::new().unwrap();
2309 let provider =
2310 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2311
2312 let tx = provider.tx();
2314
2315 let key = 42u64;
2317 let value = b"test_value".to_vec();
2318 tx.put::<TestTable>(key, &value).unwrap();
2319
2320 let result = tx.get::<TestTable>(key).unwrap();
2322 assert_eq!(
2323 result,
2324 Some(value.clone()),
2325 "Transaction should see its own uncommitted writes"
2326 );
2327
2328 let provider_result = provider.get::<TestTable>(key).unwrap();
2330 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2331
2332 tx.commit().unwrap();
2334
2335 let committed_result = provider.get::<TestTable>(key).unwrap();
2337 assert_eq!(committed_result, Some(value), "Committed data should be visible");
2338 }
2339
2340 #[test]
2341 fn test_transaction_rollback() {
2342 let temp_dir = TempDir::new().unwrap();
2343 let provider =
2344 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2345
2346 let key = 100u64;
2348 let initial_value = b"initial".to_vec();
2349 provider.put::<TestTable>(key, &initial_value).unwrap();
2350
2351 let tx = provider.tx();
2353 let new_value = b"modified".to_vec();
2354 tx.put::<TestTable>(key, &new_value).unwrap();
2355
2356 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2358
2359 tx.rollback().unwrap();
2361
2362 let result = provider.get::<TestTable>(key).unwrap();
2364 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2365 }
2366
2367 #[test]
2368 fn test_transaction_iterator() {
2369 let temp_dir = TempDir::new().unwrap();
2370 let provider =
2371 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2372
2373 let tx = provider.tx();
2375
2376 for i in 0..5u64 {
2378 let value = format!("value_{i}").into_bytes();
2379 tx.put::<TestTable>(i, &value).unwrap();
2380 }
2381
2382 let mut count = 0;
2384 for result in tx.iter::<TestTable>().unwrap() {
2385 let (key, value) = result.unwrap();
2386 assert_eq!(value, format!("value_{key}").into_bytes());
2387 count += 1;
2388 }
2389 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2390
2391 tx.commit().unwrap();
2393 }
2394
2395 #[test]
2396 fn test_batch_manual_commit() {
2397 let temp_dir = TempDir::new().unwrap();
2398 let provider =
2399 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2400
2401 let mut batch = provider.batch();
2403
2404 for i in 0..10u64 {
2406 let value = format!("batch_value_{i}").into_bytes();
2407 batch.put::<TestTable>(i, &value).unwrap();
2408 }
2409
2410 assert_eq!(batch.len(), 10);
2412 assert!(!batch.is_empty());
2413
2414 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2416
2417 batch.commit().unwrap();
2419
2420 for i in 0..10u64 {
2422 let value = format!("batch_value_{i}").into_bytes();
2423 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2424 }
2425 }
2426
2427 #[test]
2428 fn test_first_and_last_entry() {
2429 let temp_dir = TempDir::new().unwrap();
2430 let provider =
2431 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2432
2433 assert_eq!(provider.first::<TestTable>().unwrap(), None);
2435 assert_eq!(provider.last::<TestTable>().unwrap(), None);
2436
2437 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2439 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2440 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2441
2442 let first = provider.first::<TestTable>().unwrap();
2444 assert_eq!(first, Some((5, b"value_5".to_vec())));
2445
2446 let last = provider.last::<TestTable>().unwrap();
2448 assert_eq!(last, Some((20, b"value_20".to_vec())));
2449 }
2450
2451 #[test]
2455 fn test_account_history_info_pruned_before_first_entry() {
2456 let temp_dir = TempDir::new().unwrap();
2457 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2458
2459 let address = Address::from([0x42; 20]);
2460
2461 let chunk = IntegerList::new([100, 200, 300]).unwrap();
2463 let shard_key = ShardedKey::new(address, u64::MAX);
2464 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2465
2466 let tx = provider.tx();
2467
2468 let result = tx.account_history_info(address, 50, Some(100)).unwrap();
2473 assert_eq!(result, HistoryInfo::InChangeset(100));
2474
2475 tx.rollback().unwrap();
2476 }
2477
2478 #[test]
2479 fn test_account_history_shard_split_at_boundary() {
2480 let temp_dir = TempDir::new().unwrap();
2481 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2482
2483 let address = Address::from([0x42; 20]);
2484 let limit = NUM_OF_INDICES_IN_SHARD;
2485
2486 let indices: Vec<u64> = (0..=(limit as u64)).collect();
2488 let mut batch = provider.batch();
2489 batch.append_account_history_shard(address, indices).unwrap();
2490 batch.commit().unwrap();
2491
2492 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
2494 let sentinel_key = ShardedKey::new(address, u64::MAX);
2495
2496 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
2497 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
2498
2499 assert!(completed_shard.is_some(), "completed shard should exist");
2500 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2501
2502 let completed_shard = completed_shard.unwrap();
2503 let sentinel_shard = sentinel_shard.unwrap();
2504
2505 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2506 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2507 }
2508
2509 #[test]
2510 fn test_account_history_multiple_shard_splits() {
2511 let temp_dir = TempDir::new().unwrap();
2512 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2513
2514 let address = Address::from([0x43; 20]);
2515 let limit = NUM_OF_INDICES_IN_SHARD;
2516
2517 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
2519 let mut batch = provider.batch();
2520 batch.append_account_history_shard(address, first_batch_indices).unwrap();
2521 batch.commit().unwrap();
2522
2523 let sentinel_key = ShardedKey::new(address, u64::MAX);
2525 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
2526 assert!(shard.is_some());
2527 assert_eq!(shard.unwrap().len(), limit as u64);
2528
2529 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
2531 let mut batch = provider.batch();
2532 batch.append_account_history_shard(address, second_batch_indices).unwrap();
2533 batch.commit().unwrap();
2534
2535 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
2537 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
2538
2539 assert!(
2540 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
2541 "first completed shard should exist"
2542 );
2543 assert!(
2544 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
2545 "second completed shard should exist"
2546 );
2547 assert!(
2548 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
2549 "sentinel shard should exist"
2550 );
2551 }
2552
2553 #[test]
2554 fn test_storage_history_shard_split_at_boundary() {
2555 let temp_dir = TempDir::new().unwrap();
2556 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2557
2558 let address = Address::from([0x44; 20]);
2559 let slot = B256::from([0x55; 32]);
2560 let limit = NUM_OF_INDICES_IN_SHARD;
2561
2562 let indices: Vec<u64> = (0..=(limit as u64)).collect();
2564 let mut batch = provider.batch();
2565 batch.append_storage_history_shard(address, slot, indices).unwrap();
2566 batch.commit().unwrap();
2567
2568 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
2570 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
2571
2572 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
2573 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
2574
2575 assert!(completed_shard.is_some(), "completed shard should exist");
2576 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2577
2578 let completed_shard = completed_shard.unwrap();
2579 let sentinel_shard = sentinel_shard.unwrap();
2580
2581 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2582 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2583 }
2584
2585 #[test]
2586 fn test_storage_history_multiple_shard_splits() {
2587 let temp_dir = TempDir::new().unwrap();
2588 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2589
2590 let address = Address::from([0x46; 20]);
2591 let slot = B256::from([0x57; 32]);
2592 let limit = NUM_OF_INDICES_IN_SHARD;
2593
2594 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
2596 let mut batch = provider.batch();
2597 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
2598 batch.commit().unwrap();
2599
2600 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
2602 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
2603 assert!(shard.is_some());
2604 assert_eq!(shard.unwrap().len(), limit as u64);
2605
2606 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
2608 let mut batch = provider.batch();
2609 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
2610 batch.commit().unwrap();
2611
2612 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
2614 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
2615
2616 assert!(
2617 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
2618 "first completed shard should exist"
2619 );
2620 assert!(
2621 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
2622 "second completed shard should exist"
2623 );
2624 assert!(
2625 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
2626 "sentinel shard should exist"
2627 );
2628 }
2629
2630 #[test]
2631 fn test_clear_table() {
2632 let temp_dir = TempDir::new().unwrap();
2633 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2634
2635 let address = Address::from([0x42; 20]);
2636 let key = ShardedKey::new(address, u64::MAX);
2637 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
2638
2639 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
2640 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
2641
2642 provider.clear::<tables::AccountsHistory>().unwrap();
2643
2644 assert!(
2645 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
2646 "table should be empty after clear"
2647 );
2648 assert!(
2649 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
2650 "first() should return None after clear"
2651 );
2652 }
2653
2654 #[test]
2655 fn test_clear_empty_table() {
2656 let temp_dir = TempDir::new().unwrap();
2657 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2658
2659 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
2660
2661 provider.clear::<tables::AccountsHistory>().unwrap();
2662
2663 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
2664 }
2665
2666 #[test]
2667 fn test_unwind_account_history_to_basic() {
2668 let temp_dir = TempDir::new().unwrap();
2669 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2670
2671 let address = Address::from([0x42; 20]);
2672
2673 let mut batch = provider.batch();
2675 batch.append_account_history_shard(address, 0..=10).unwrap();
2676 batch.commit().unwrap();
2677
2678 let key = ShardedKey::new(address, u64::MAX);
2680 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
2681 assert!(result.is_some());
2682 let blocks: Vec<u64> = result.unwrap().iter().collect();
2683 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
2684
2685 let mut batch = provider.batch();
2687 batch.unwind_account_history_to(address, 5).unwrap();
2688 batch.commit().unwrap();
2689
2690 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2692 assert!(result.is_some());
2693 let blocks: Vec<u64> = result.unwrap().iter().collect();
2694 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
2695 }
2696
2697 #[test]
2698 fn test_unwind_account_history_to_removes_all() {
2699 let temp_dir = TempDir::new().unwrap();
2700 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2701
2702 let address = Address::from([0x42; 20]);
2703
2704 let mut batch = provider.batch();
2706 batch.append_account_history_shard(address, 5..=10).unwrap();
2707 batch.commit().unwrap();
2708
2709 let mut batch = provider.batch();
2711 batch.unwind_account_history_to(address, 4).unwrap();
2712 batch.commit().unwrap();
2713
2714 let key = ShardedKey::new(address, u64::MAX);
2716 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2717 assert!(result.is_none(), "Should have no data after full unwind");
2718 }
2719
2720 #[test]
2721 fn test_unwind_account_history_to_no_op() {
2722 let temp_dir = TempDir::new().unwrap();
2723 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2724
2725 let address = Address::from([0x42; 20]);
2726
2727 let mut batch = provider.batch();
2729 batch.append_account_history_shard(address, 0..=5).unwrap();
2730 batch.commit().unwrap();
2731
2732 let mut batch = provider.batch();
2734 batch.unwind_account_history_to(address, 10).unwrap();
2735 batch.commit().unwrap();
2736
2737 let key = ShardedKey::new(address, u64::MAX);
2739 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2740 assert!(result.is_some());
2741 let blocks: Vec<u64> = result.unwrap().iter().collect();
2742 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
2743 }
2744
2745 #[test]
2746 fn test_unwind_account_history_to_block_zero() {
2747 let temp_dir = TempDir::new().unwrap();
2748 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2749
2750 let address = Address::from([0x42; 20]);
2751
2752 let mut batch = provider.batch();
2754 batch.append_account_history_shard(address, 0..=5).unwrap();
2755 batch.commit().unwrap();
2756
2757 let mut batch = provider.batch();
2760 batch.unwind_account_history_to(address, 0).unwrap();
2761 batch.commit().unwrap();
2762
2763 let key = ShardedKey::new(address, u64::MAX);
2765 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2766 assert!(result.is_some());
2767 let blocks: Vec<u64> = result.unwrap().iter().collect();
2768 assert_eq!(blocks, vec![0]);
2769 }
2770
2771 #[test]
2772 fn test_unwind_account_history_to_multi_shard() {
2773 let temp_dir = TempDir::new().unwrap();
2774 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2775
2776 let address = Address::from([0x42; 20]);
2777
2778 let mut batch = provider.batch();
2781
2782 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
2784 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
2785
2786 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
2788 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
2789
2790 batch.commit().unwrap();
2791
2792 let shards = provider.account_history_shards(address).unwrap();
2794 assert_eq!(shards.len(), 2);
2795
2796 let mut batch = provider.batch();
2798 batch.unwind_account_history_to(address, 75).unwrap();
2799 batch.commit().unwrap();
2800
2801 let shards = provider.account_history_shards(address).unwrap();
2803 assert_eq!(shards.len(), 2);
2804
2805 assert_eq!(shards[0].0.highest_block_number, 50);
2807 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
2808
2809 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
2811 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
2812 }
2813
2814 #[test]
2815 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
2816 let temp_dir = TempDir::new().unwrap();
2817 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2818
2819 let address = Address::from([0x42; 20]);
2820
2821 let mut batch = provider.batch();
2823
2824 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
2826 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
2827
2828 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
2830 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
2831
2832 batch.commit().unwrap();
2833
2834 let mut batch = provider.batch();
2836 batch.unwind_account_history_to(address, 60).unwrap();
2837 batch.commit().unwrap();
2838
2839 let shards = provider.account_history_shards(address).unwrap();
2841 assert_eq!(shards.len(), 1);
2842 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
2843 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
2844 }
2845
2846 #[test]
2847 fn test_account_history_shards_iterator() {
2848 let temp_dir = TempDir::new().unwrap();
2849 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2850
2851 let address = Address::from([0x42; 20]);
2852 let other_address = Address::from([0x43; 20]);
2853
2854 let mut batch = provider.batch();
2856 batch.append_account_history_shard(address, 0..=5).unwrap();
2857 batch.append_account_history_shard(other_address, 10..=15).unwrap();
2858 batch.commit().unwrap();
2859
2860 let shards = provider.account_history_shards(address).unwrap();
2862 assert_eq!(shards.len(), 1);
2863 assert_eq!(shards[0].0.key, address);
2864
2865 let shards = provider.account_history_shards(other_address).unwrap();
2867 assert_eq!(shards.len(), 1);
2868 assert_eq!(shards[0].0.key, other_address);
2869
2870 let non_existent = Address::from([0x99; 20]);
2872 let shards = provider.account_history_shards(non_existent).unwrap();
2873 assert!(shards.is_empty());
2874 }
2875
2876 #[test]
2877 fn test_clear_account_history() {
2878 let temp_dir = TempDir::new().unwrap();
2879 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2880
2881 let address = Address::from([0x42; 20]);
2882
2883 let mut batch = provider.batch();
2885 batch.append_account_history_shard(address, 0..=10).unwrap();
2886 batch.commit().unwrap();
2887
2888 let mut batch = provider.batch();
2890 batch.clear_account_history(address).unwrap();
2891 batch.commit().unwrap();
2892
2893 let shards = provider.account_history_shards(address).unwrap();
2895 assert!(shards.is_empty(), "All shards should be deleted");
2896 }
2897
2898 #[test]
2899 fn test_unwind_non_sentinel_boundary() {
2900 let temp_dir = TempDir::new().unwrap();
2901 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2902
2903 let address = Address::from([0x42; 20]);
2904
2905 let mut batch = provider.batch();
2907
2908 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
2910 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
2911
2912 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
2914 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
2915
2916 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
2918 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
2919
2920 batch.commit().unwrap();
2921
2922 let shards = provider.account_history_shards(address).unwrap();
2924 assert_eq!(shards.len(), 3);
2925
2926 let mut batch = provider.batch();
2928 batch.unwind_account_history_to(address, 75).unwrap();
2929 batch.commit().unwrap();
2930
2931 let shards = provider.account_history_shards(address).unwrap();
2933 assert_eq!(shards.len(), 2);
2934
2935 assert_eq!(shards[0].0.highest_block_number, 50);
2937 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
2938
2939 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
2941 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
2942 }
2943
2944 #[test]
2945 fn test_batch_auto_commit_on_threshold() {
2946 let temp_dir = TempDir::new().unwrap();
2947 let provider =
2948 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2949
2950 let mut batch = RocksDBBatch {
2952 provider: &provider,
2953 inner: WriteBatchWithTransaction::<true>::default(),
2954 buf: Vec::new(),
2955 auto_commit_threshold: Some(1024), };
2957
2958 for i in 0..100u64 {
2961 let value = format!("value_{i:04}").into_bytes();
2962 batch.put::<TestTable>(i, &value).unwrap();
2963 }
2964
2965 let first_visible = provider.get::<TestTable>(0).unwrap();
2968 assert!(first_visible.is_some(), "Auto-committed data should be visible");
2969
2970 batch.commit().unwrap();
2972
2973 for i in 0..100u64 {
2975 let value = format!("value_{i:04}").into_bytes();
2976 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2977 }
2978 }
2979}