1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5 map::{AddressMap, HashMap},
6 Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13 database_metrics::DatabaseMetrics,
14 models::{
15 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16 StorageSettings,
17 },
18 table::{Compress, Decode, Decompress, Encode, Table},
19 tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25 provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30 OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
31 WriteBatchWithTransaction, WriteOptions, DB,
32};
33use std::{
34 collections::BTreeMap,
35 fmt,
36 path::{Path, PathBuf},
37 sync::Arc,
38};
39use tracing::instrument;
40
41pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
43
44type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
46
47#[derive(Debug, Clone)]
49pub struct RocksDBTableStats {
50 pub sst_size_bytes: u64,
52 pub memtable_size_bytes: u64,
54 pub name: String,
56 pub estimated_num_keys: u64,
58 pub estimated_size_bytes: u64,
60 pub pending_compaction_bytes: u64,
62}
63
64#[derive(Debug, Clone)]
68pub struct RocksDBStats {
69 pub tables: Vec<RocksDBTableStats>,
71 pub wal_size_bytes: u64,
75}
76
77#[derive(Clone)]
79pub(crate) struct RocksDBWriteCtx {
80 pub first_block_number: BlockNumber,
82 pub prune_tx_lookup: Option<PruneMode>,
84 pub storage_settings: StorageSettings,
86 pub pending_batches: PendingRocksDBBatches,
88}
89
90impl fmt::Debug for RocksDBWriteCtx {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("RocksDBWriteCtx")
93 .field("first_block_number", &self.first_block_number)
94 .field("prune_tx_lookup", &self.prune_tx_lookup)
95 .field("storage_settings", &self.storage_settings)
96 .field("pending_batches", &"<pending batches>")
97 .finish()
98 }
99}
100
101const DEFAULT_CACHE_SIZE: usize = 128 << 20;
103
104const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
106
107const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
109
110const DEFAULT_MAX_OPEN_FILES: i32 = 512;
118
119const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
121
122const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
128
129const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
133
134const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
140
141pub struct RocksDBBuilder {
143 path: PathBuf,
144 column_families: Vec<String>,
145 enable_metrics: bool,
146 enable_statistics: bool,
147 log_level: rocksdb::LogLevel,
148 block_cache: Cache,
149 read_only: bool,
150}
151
152impl fmt::Debug for RocksDBBuilder {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 f.debug_struct("RocksDBBuilder")
155 .field("path", &self.path)
156 .field("column_families", &self.column_families)
157 .field("enable_metrics", &self.enable_metrics)
158 .finish()
159 }
160}
161
162impl RocksDBBuilder {
163 pub fn new(path: impl AsRef<Path>) -> Self {
165 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
166 Self {
167 path: path.as_ref().to_path_buf(),
168 column_families: Vec::new(),
169 enable_metrics: false,
170 enable_statistics: false,
171 log_level: rocksdb::LogLevel::Info,
172 block_cache: cache,
173 read_only: false,
174 }
175 }
176
177 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
179 let mut table_options = BlockBasedOptions::default();
180 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
181 table_options.set_cache_index_and_filter_blocks(true);
182 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
183 table_options.set_block_cache(cache);
185 table_options
186 }
187
188 fn default_options(
190 log_level: rocksdb::LogLevel,
191 cache: &Cache,
192 enable_statistics: bool,
193 ) -> Options {
194 let table_options = Self::default_table_options(cache);
196
197 let mut options = Options::default();
198 options.set_block_based_table_factory(&table_options);
199 options.create_if_missing(true);
200 options.create_missing_column_families(true);
201 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
202 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
203
204 options.set_bottommost_compression_type(DBCompressionType::Zstd);
205 options.set_bottommost_zstd_max_train_bytes(0, true);
206 options.set_compression_type(DBCompressionType::Lz4);
207 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
208
209 options.set_log_level(log_level);
210
211 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
212
213 options.set_wal_ttl_seconds(0);
216 options.set_wal_size_limit_mb(0);
217
218 if enable_statistics {
220 options.enable_statistics();
221 }
222
223 options
224 }
225
226 fn default_column_family_options(cache: &Cache) -> Options {
228 let table_options = Self::default_table_options(cache);
230
231 let mut cf_options = Options::default();
232 cf_options.set_block_based_table_factory(&table_options);
233 cf_options.set_level_compaction_dynamic_level_bytes(true);
234 cf_options.set_compression_type(DBCompressionType::Lz4);
236 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
237 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
239 cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
240
241 cf_options
242 }
243
244 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
251 let mut table_options = BlockBasedOptions::default();
252 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
253 table_options.set_cache_index_and_filter_blocks(true);
254 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
255 table_options.set_block_cache(cache);
256 let mut cf_options = Options::default();
260 cf_options.set_block_based_table_factory(&table_options);
261 cf_options.set_level_compaction_dynamic_level_bytes(true);
262 cf_options.set_compression_type(DBCompressionType::None);
265 cf_options.set_bottommost_compression_type(DBCompressionType::None);
266
267 cf_options
268 }
269
270 pub fn with_table<T: Table>(mut self) -> Self {
272 self.column_families.push(T::NAME.to_string());
273 self
274 }
275
276 pub fn with_default_tables(self) -> Self {
283 self.with_table::<tables::TransactionHashNumbers>()
284 .with_table::<tables::AccountsHistory>()
285 .with_table::<tables::StoragesHistory>()
286 }
287
288 pub const fn with_metrics(mut self) -> Self {
290 self.enable_metrics = true;
291 self
292 }
293
294 pub const fn with_statistics(mut self) -> Self {
296 self.enable_statistics = true;
297 self
298 }
299
300 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
302 if let Some(level) = log_level {
303 self.log_level = convert_log_level(level);
304 }
305 self
306 }
307
308 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
310 self.block_cache = Cache::new_lru_cache(capacity_bytes);
311 self
312 }
313
314 pub const fn with_read_only(mut self, read_only: bool) -> Self {
322 self.read_only = read_only;
323 self
324 }
325
326 pub fn build(self) -> ProviderResult<RocksDBProvider> {
328 let options =
329 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
330
331 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
332 .column_families
333 .iter()
334 .map(|name| {
335 let cf_options = if name == tables::TransactionHashNumbers::NAME {
336 Self::tx_hash_numbers_column_family_options(&self.block_cache)
337 } else {
338 Self::default_column_family_options(&self.block_cache)
339 };
340 ColumnFamilyDescriptor::new(name.clone(), cf_options)
341 })
342 .collect();
343
344 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
345
346 if self.read_only {
347 let mut options = options;
350 options.set_max_open_files(-1);
351
352 let secondary_path = self
353 .path
354 .parent()
355 .unwrap_or(&self.path)
356 .join(format!("rocksdb-secondary-tmp-{}", std::process::id()));
357 reth_fs_util::create_dir_all(&secondary_path).map_err(ProviderError::other)?;
358
359 let db = DB::open_cf_descriptors_as_secondary(
360 &options,
361 &self.path,
362 &secondary_path,
363 cf_descriptors,
364 )
365 .map_err(|e| {
366 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
367 message: e.to_string().into(),
368 code: -1,
369 }))
370 })?;
371 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::Secondary {
372 db,
373 metrics,
374 secondary_path,
375 })))
376 } else {
377 let db =
382 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
383 .map_err(|e| {
384 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
385 message: e.to_string().into(),
386 code: -1,
387 }))
388 })?;
389 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
390 }
391 }
392}
393
394macro_rules! compress_to_buf_or_ref {
397 ($buf:expr, $value:expr) => {
398 if let Some(value) = $value.uncompressable_ref() {
399 Some(value)
400 } else {
401 $buf.clear();
402 $value.compress_to_buf(&mut $buf);
403 None
404 }
405 };
406}
407
408#[derive(Debug)]
410pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
411
412enum RocksDBProviderInner {
414 ReadWrite {
416 db: OptimisticTransactionDB,
418 metrics: Option<RocksDBMetrics>,
420 },
421 Secondary {
425 db: DB,
427 metrics: Option<RocksDBMetrics>,
429 secondary_path: PathBuf,
431 },
432}
433
434impl RocksDBProviderInner {
435 const fn metrics(&self) -> Option<&RocksDBMetrics> {
437 match self {
438 Self::ReadWrite { metrics, .. } | Self::Secondary { metrics, .. } => metrics.as_ref(),
439 }
440 }
441
442 fn db_rw(&self) -> &OptimisticTransactionDB {
444 match self {
445 Self::ReadWrite { db, .. } => db,
446 Self::Secondary { .. } => {
447 panic!("Cannot perform write operation on secondary RocksDB provider")
448 }
449 }
450 }
451
452 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
454 let cf = match self {
455 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
456 Self::Secondary { db, .. } => db.cf_handle(T::NAME),
457 };
458 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
459 }
460
461 fn get_cf(
463 &self,
464 cf: &rocksdb::ColumnFamily,
465 key: impl AsRef<[u8]>,
466 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
467 match self {
468 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
469 Self::Secondary { db, .. } => db.get_cf(cf, key),
470 }
471 }
472
473 fn put_cf(
475 &self,
476 cf: &rocksdb::ColumnFamily,
477 key: impl AsRef<[u8]>,
478 value: impl AsRef<[u8]>,
479 ) -> Result<(), rocksdb::Error> {
480 self.db_rw().put_cf(cf, key, value)
481 }
482
483 fn delete_cf(
485 &self,
486 cf: &rocksdb::ColumnFamily,
487 key: impl AsRef<[u8]>,
488 ) -> Result<(), rocksdb::Error> {
489 self.db_rw().delete_cf(cf, key)
490 }
491
492 fn delete_range_cf<K: AsRef<[u8]>>(
494 &self,
495 cf: &rocksdb::ColumnFamily,
496 from: K,
497 to: K,
498 ) -> Result<(), rocksdb::Error> {
499 self.db_rw().delete_range_cf(cf, from, to)
500 }
501
502 fn iterator_cf(
504 &self,
505 cf: &rocksdb::ColumnFamily,
506 mode: IteratorMode<'_>,
507 ) -> RocksDBIterEnum<'_> {
508 match self {
509 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
510 Self::Secondary { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
511 }
512 }
513
514 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
519 match self {
520 Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
521 Self::Secondary { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
522 }
523 }
524
525 fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
527 match self {
528 Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
529 Self::Secondary { db, .. } => RocksReadSnapshotInner::Secondary(db),
530 }
531 }
532
533 fn path(&self) -> &Path {
535 match self {
536 Self::ReadWrite { db, .. } => db.path(),
537 Self::Secondary { db, .. } => db.path(),
538 }
539 }
540
541 fn wal_size_bytes(&self) -> u64 {
545 let path = self.path();
546
547 match std::fs::read_dir(path) {
548 Ok(entries) => entries
549 .filter_map(|e| e.ok())
550 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
551 .filter_map(|e| e.metadata().ok())
552 .map(|m| m.len())
553 .sum(),
554 Err(_) => 0,
555 }
556 }
557
558 fn table_stats(&self) -> Vec<RocksDBTableStats> {
560 let mut stats = Vec::new();
561
562 macro_rules! collect_stats {
563 ($db:expr) => {
564 for cf_name in ROCKSDB_TABLES {
565 if let Some(cf) = $db.cf_handle(cf_name) {
566 let estimated_num_keys = $db
567 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
568 .ok()
569 .flatten()
570 .unwrap_or(0);
571
572 let sst_size = $db
574 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
575 .ok()
576 .flatten()
577 .unwrap_or(0);
578
579 let memtable_size = $db
580 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
581 .ok()
582 .flatten()
583 .unwrap_or(0);
584
585 let estimated_size_bytes = sst_size + memtable_size;
586
587 let pending_compaction_bytes = $db
588 .property_int_value_cf(
589 cf,
590 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
591 )
592 .ok()
593 .flatten()
594 .unwrap_or(0);
595
596 stats.push(RocksDBTableStats {
597 sst_size_bytes: sst_size,
598 memtable_size_bytes: memtable_size,
599 name: cf_name.to_string(),
600 estimated_num_keys,
601 estimated_size_bytes,
602 pending_compaction_bytes,
603 });
604 }
605 }
606 };
607 }
608
609 match self {
610 Self::ReadWrite { db, .. } => collect_stats!(db),
611 Self::Secondary { db, .. } => collect_stats!(db),
612 }
613
614 stats
615 }
616
617 fn db_stats(&self) -> RocksDBStats {
619 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
620 }
621}
622
623impl fmt::Debug for RocksDBProviderInner {
624 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625 match self {
626 Self::ReadWrite { metrics, .. } => f
627 .debug_struct("RocksDBProviderInner::ReadWrite")
628 .field("db", &"<OptimisticTransactionDB>")
629 .field("metrics", metrics)
630 .finish(),
631 Self::Secondary { metrics, .. } => f
632 .debug_struct("RocksDBProviderInner::Secondary")
633 .field("db", &"<DB (secondary)>")
634 .field("metrics", metrics)
635 .finish(),
636 }
637 }
638}
639
640impl Drop for RocksDBProviderInner {
641 fn drop(&mut self) {
642 match self {
643 Self::ReadWrite { db, .. } => {
644 if let Err(e) = db.flush_wal(true) {
647 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
648 }
649 for cf_name in ROCKSDB_TABLES {
650 if let Some(cf) = db.cf_handle(cf_name) &&
651 let Err(e) = db.flush_cf(&cf)
652 {
653 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
654 }
655 }
656 db.cancel_all_background_work(true);
657 }
658 Self::Secondary { db, secondary_path, .. } => {
659 db.cancel_all_background_work(true);
660 let _ = std::fs::remove_dir_all(secondary_path);
661 }
662 }
663 }
664}
665
666impl Clone for RocksDBProvider {
667 fn clone(&self) -> Self {
668 Self(self.0.clone())
669 }
670}
671
672impl DatabaseMetrics for RocksDBProvider {
673 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
674 let mut metrics = Vec::new();
675
676 for stat in self.table_stats() {
677 metrics.push((
678 "rocksdb.table_size",
679 stat.estimated_size_bytes as f64,
680 vec![Label::new("table", stat.name.clone())],
681 ));
682 metrics.push((
683 "rocksdb.table_entries",
684 stat.estimated_num_keys as f64,
685 vec![Label::new("table", stat.name.clone())],
686 ));
687 metrics.push((
688 "rocksdb.pending_compaction_bytes",
689 stat.pending_compaction_bytes as f64,
690 vec![Label::new("table", stat.name.clone())],
691 ));
692 metrics.push((
693 "rocksdb.sst_size",
694 stat.sst_size_bytes as f64,
695 vec![Label::new("table", stat.name.clone())],
696 ));
697 metrics.push((
698 "rocksdb.memtable_size",
699 stat.memtable_size_bytes as f64,
700 vec![Label::new("table", stat.name)],
701 ));
702 }
703
704 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
706
707 metrics
708 }
709}
710
711impl RocksDBProvider {
712 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
714 RocksDBBuilder::new(path).build()
715 }
716
717 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
719 RocksDBBuilder::new(path)
720 }
721
722 pub fn exists(path: impl AsRef<Path>) -> bool {
727 path.as_ref().join("CURRENT").exists()
728 }
729
730 pub fn is_read_only(&self) -> bool {
732 matches!(self.0.as_ref(), RocksDBProviderInner::Secondary { .. })
733 }
734
735 pub fn try_catch_up_with_primary(&self) -> ProviderResult<()> {
740 match self.0.as_ref() {
741 RocksDBProviderInner::Secondary { db, .. } => {
742 db.try_catch_up_with_primary().map_err(|e| {
743 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
744 message: e.to_string().into(),
745 code: -1,
746 }))
747 })
748 }
749 _ => Ok(()),
750 }
751 }
752
753 pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
757 RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
758 }
759
760 pub fn tx(&self) -> RocksTx<'_> {
768 let write_options = WriteOptions::default();
769 let txn_options = OptimisticTransactionOptions::default();
770 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
771 RocksTx { inner, provider: self }
772 }
773
774 pub fn batch(&self) -> RocksDBBatch<'_> {
782 RocksDBBatch {
783 provider: self,
784 inner: WriteBatchWithTransaction::<true>::default(),
785 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
786 auto_commit_threshold: None,
787 }
788 }
789
790 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
796 RocksDBBatch {
797 provider: self,
798 inner: WriteBatchWithTransaction::<true>::default(),
799 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
800 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
801 }
802 }
803
804 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
806 self.0.cf_handle::<T>()
807 }
808
809 fn execute_with_operation_metric<R>(
811 &self,
812 operation: RocksDBOperation,
813 table: &'static str,
814 f: impl FnOnce(&Self) -> R,
815 ) -> R {
816 let start = self.0.metrics().map(|_| Instant::now());
817 let res = f(self);
818
819 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
820 metrics.record_operation(operation, table, start.elapsed());
821 }
822
823 res
824 }
825
826 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
828 self.get_encoded::<T>(&key.encode())
829 }
830
831 pub fn get_encoded<T: Table>(
833 &self,
834 key: &<T::Key as Encode>::Encoded,
835 ) -> ProviderResult<Option<T::Value>> {
836 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
837 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
838 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
839 message: e.to_string().into(),
840 code: -1,
841 }))
842 })?;
843
844 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
845 })
846 }
847
848 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
853 let encoded_key = key.encode();
854 self.put_encoded::<T>(&encoded_key, value)
855 }
856
857 pub fn put_encoded<T: Table>(
862 &self,
863 key: &<T::Key as Encode>::Encoded,
864 value: &T::Value,
865 ) -> ProviderResult<()> {
866 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
867 let mut buf = Vec::new();
871 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
872
873 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
874 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
875 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
876 operation: DatabaseWriteOperation::PutUpsert,
877 table_name: T::NAME,
878 key: key.as_ref().to_vec(),
879 })))
880 })
881 })
882 }
883
884 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
889 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
890 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
891 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
892 message: e.to_string().into(),
893 code: -1,
894 }))
895 })
896 })
897 }
898
899 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
905 let cf = self.get_cf_handle::<T>()?;
906
907 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
908 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
909 message: e.to_string().into(),
910 code: -1,
911 }))
912 })?;
913
914 Ok(())
915 }
916
917 fn get_boundary<T: Table>(
919 &self,
920 mode: IteratorMode<'_>,
921 ) -> ProviderResult<Option<(T::Key, T::Value)>> {
922 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
923 let cf = this.get_cf_handle::<T>()?;
924 let mut iter = this.0.iterator_cf(cf, mode);
925
926 match iter.next() {
927 Some(Ok((key_bytes, value_bytes))) => {
928 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
929 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
930 let value = T::Value::decompress(&value_bytes)
931 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
932 Ok(Some((key, value)))
933 }
934 Some(Err(e)) => {
935 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
936 message: e.to_string().into(),
937 code: -1,
938 })))
939 }
940 None => Ok(None),
941 }
942 })
943 }
944
945 #[inline]
947 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
948 self.get_boundary::<T>(IteratorMode::Start)
949 }
950
951 #[inline]
953 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
954 self.get_boundary::<T>(IteratorMode::End)
955 }
956
957 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
961 let cf = self.get_cf_handle::<T>()?;
962 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
963 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
964 }
965
966 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
970 self.0.table_stats()
971 }
972
973 pub fn wal_size_bytes(&self) -> u64 {
979 self.0.wal_size_bytes()
980 }
981
982 pub fn db_stats(&self) -> RocksDBStats {
986 self.0.db_stats()
987 }
988
989 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
1000 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
1001 let db = self.0.db_rw();
1002
1003 for cf_name in tables {
1004 if let Some(cf) = db.cf_handle(cf_name) {
1005 db.flush_cf(&cf).map_err(|e| {
1006 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1007 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1008 operation: DatabaseWriteOperation::Flush,
1009 table_name: cf_name,
1010 key: Vec::new(),
1011 })))
1012 })?;
1013 }
1014 }
1015
1016 db.flush_wal(true).map_err(|e| {
1017 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1018 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1019 operation: DatabaseWriteOperation::Flush,
1020 table_name: "WAL",
1021 key: Vec::new(),
1022 })))
1023 })?;
1024
1025 Ok(())
1026 }
1027
1028 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1040 pub fn flush_and_compact(&self) -> ProviderResult<()> {
1041 self.flush(ROCKSDB_TABLES)?;
1042
1043 let db = self.0.db_rw();
1044
1045 for cf_name in ROCKSDB_TABLES {
1046 if let Some(cf) = db.cf_handle(cf_name) {
1047 db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
1048 }
1049 }
1050
1051 Ok(())
1052 }
1053
1054 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1058 let cf = self.get_cf_handle::<T>()?;
1059 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1060 Ok(RocksDBRawIter { inner: iter })
1061 }
1062
1063 pub fn account_history_shards(
1068 &self,
1069 address: Address,
1070 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1071 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1073
1074 let start_key = ShardedKey::new(address, 0u64);
1077 let start_bytes = start_key.encode();
1078
1079 let iter = self
1081 .0
1082 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1083
1084 let mut result = Vec::new();
1085 for item in iter {
1086 match item {
1087 Ok((key_bytes, value_bytes)) => {
1088 let key = ShardedKey::<Address>::decode(&key_bytes)
1090 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1091
1092 if key.key != address {
1094 break;
1095 }
1096
1097 let value = BlockNumberList::decompress(&value_bytes)
1099 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1100
1101 result.push((key, value));
1102 }
1103 Err(e) => {
1104 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1105 message: e.to_string().into(),
1106 code: -1,
1107 })));
1108 }
1109 }
1110 }
1111
1112 Ok(result)
1113 }
1114
1115 pub fn storage_history_shards(
1120 &self,
1121 address: Address,
1122 storage_key: B256,
1123 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1124 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1125
1126 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1127 let start_bytes = start_key.encode();
1128
1129 let iter = self
1130 .0
1131 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1132
1133 let mut result = Vec::new();
1134 for item in iter {
1135 match item {
1136 Ok((key_bytes, value_bytes)) => {
1137 let key = StorageShardedKey::decode(&key_bytes)
1138 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1139
1140 if key.address != address || key.sharded_key.key != storage_key {
1141 break;
1142 }
1143
1144 let value = BlockNumberList::decompress(&value_bytes)
1145 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1146
1147 result.push((key, value));
1148 }
1149 Err(e) => {
1150 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1151 message: e.to_string().into(),
1152 code: -1,
1153 })));
1154 }
1155 }
1156 }
1157
1158 Ok(result)
1159 }
1160
1161 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1169 pub fn unwind_account_history_indices(
1170 &self,
1171 last_indices: &[(Address, BlockNumber)],
1172 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1173 let mut address_min_block: AddressMap<BlockNumber> =
1174 AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1175 for &(address, block_number) in last_indices {
1176 address_min_block
1177 .entry(address)
1178 .and_modify(|min| *min = (*min).min(block_number))
1179 .or_insert(block_number);
1180 }
1181
1182 let mut batch = self.batch();
1183 for (address, min_block) in address_min_block {
1184 match min_block.checked_sub(1) {
1185 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1186 None => batch.clear_account_history(address)?,
1187 }
1188 }
1189
1190 Ok(batch.into_inner())
1191 }
1192
1193 pub fn unwind_storage_history_indices(
1201 &self,
1202 storage_changesets: &[(Address, B256, BlockNumber)],
1203 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1204 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1205 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1206 for &(address, storage_key, block_number) in storage_changesets {
1207 key_min_block
1208 .entry((address, storage_key))
1209 .and_modify(|min| *min = (*min).min(block_number))
1210 .or_insert(block_number);
1211 }
1212
1213 let mut batch = self.batch();
1214 for ((address, storage_key), min_block) in key_min_block {
1215 match min_block.checked_sub(1) {
1216 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1217 None => batch.clear_storage_history(address, storage_key)?,
1218 }
1219 }
1220
1221 Ok(batch.into_inner())
1222 }
1223
1224 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1226 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1227 where
1228 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1229 {
1230 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1231 let mut batch_handle = this.batch();
1232 f(&mut batch_handle)?;
1233 batch_handle.commit()
1234 })
1235 }
1236
1237 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1245 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1246 self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1247 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1248 message: e.to_string().into(),
1249 code: -1,
1250 }))
1251 })
1252 }
1253
1254 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1260 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1261 &self,
1262 blocks: &[ExecutedBlock<N>],
1263 tx_nums: &[TxNumber],
1264 ctx: RocksDBWriteCtx,
1265 runtime: &reth_tasks::Runtime,
1266 ) -> ProviderResult<()> {
1267 if !ctx.storage_settings.storage_v2 {
1268 return Ok(());
1269 }
1270
1271 let mut r_tx_hash = None;
1272 let mut r_account_history = None;
1273 let mut r_storage_history = None;
1274
1275 let write_tx_hash =
1276 ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1277 let write_account_history = ctx.storage_settings.storage_v2;
1278 let write_storage_history = ctx.storage_settings.storage_v2;
1279
1280 let span = tracing::Span::current();
1283 runtime.storage_pool().in_place_scope(|s| {
1284 if write_tx_hash {
1285 s.spawn(|_| {
1286 let _guard = span.enter();
1287 r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1288 });
1289 }
1290
1291 if write_account_history {
1292 s.spawn(|_| {
1293 let _guard = span.enter();
1294 r_account_history = Some(self.write_account_history(blocks, &ctx));
1295 });
1296 }
1297
1298 if write_storage_history {
1299 s.spawn(|_| {
1300 let _guard = span.enter();
1301 r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1302 });
1303 }
1304 });
1305
1306 if write_tx_hash {
1307 r_tx_hash.ok_or_else(|| {
1308 ProviderError::Database(DatabaseError::Other(
1309 "rocksdb tx-hash write thread panicked".into(),
1310 ))
1311 })??;
1312 }
1313 if write_account_history {
1314 r_account_history.ok_or_else(|| {
1315 ProviderError::Database(DatabaseError::Other(
1316 "rocksdb account-history write thread panicked".into(),
1317 ))
1318 })??;
1319 }
1320 if write_storage_history {
1321 r_storage_history.ok_or_else(|| {
1322 ProviderError::Database(DatabaseError::Other(
1323 "rocksdb storage-history write thread panicked".into(),
1324 ))
1325 })??;
1326 }
1327
1328 Ok(())
1329 }
1330
1331 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1333 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1334 &self,
1335 blocks: &[ExecutedBlock<N>],
1336 tx_nums: &[TxNumber],
1337 ctx: &RocksDBWriteCtx,
1338 ) -> ProviderResult<()> {
1339 let mut batch = self.batch();
1340 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1341 let body = block.recovered_block().body();
1342 for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1343 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1344 }
1345 }
1346 ctx.pending_batches.lock().push(batch.into_inner());
1347 Ok(())
1348 }
1349
1350 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1354 fn write_account_history<N: reth_node_types::NodePrimitives>(
1355 &self,
1356 blocks: &[ExecutedBlock<N>],
1357 ctx: &RocksDBWriteCtx,
1358 ) -> ProviderResult<()> {
1359 let mut batch = self.batch();
1360 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1361
1362 for (block_idx, block) in blocks.iter().enumerate() {
1363 let block_number = ctx.first_block_number + block_idx as u64;
1364 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1365
1366 for account_block_reverts in reverts.accounts {
1369 for (address, _) in account_block_reverts {
1370 account_history.entry(address).or_default().push(block_number);
1371 }
1372 }
1373 }
1374
1375 for (address, indices) in account_history {
1377 batch.append_account_history_shard(address, indices)?;
1378 }
1379 ctx.pending_batches.lock().push(batch.into_inner());
1380 Ok(())
1381 }
1382
1383 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1387 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1388 &self,
1389 blocks: &[ExecutedBlock<N>],
1390 ctx: &RocksDBWriteCtx,
1391 ) -> ProviderResult<()> {
1392 let mut batch = self.batch();
1393 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1394
1395 for (block_idx, block) in blocks.iter().enumerate() {
1396 let block_number = ctx.first_block_number + block_idx as u64;
1397 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1398
1399 for storage_block_reverts in reverts.storage {
1402 for revert in storage_block_reverts {
1403 for (slot, _) in revert.storage_revert {
1404 let plain_key = B256::new(slot.to_be_bytes());
1405 storage_history
1406 .entry((revert.address, plain_key))
1407 .or_default()
1408 .push(block_number);
1409 }
1410 }
1411 }
1412 }
1413
1414 for ((address, slot), indices) in storage_history {
1416 batch.append_storage_history_shard(address, slot, indices)?;
1417 }
1418 ctx.pending_batches.lock().push(batch.into_inner());
1419 Ok(())
1420 }
1421}
1422
1423pub struct RocksReadSnapshot<'db> {
1431 inner: RocksReadSnapshotInner<'db>,
1432 provider: &'db RocksDBProvider,
1433}
1434
1435enum RocksReadSnapshotInner<'db> {
1437 ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1439 Secondary(&'db DB),
1441}
1442
1443impl<'db> RocksReadSnapshotInner<'db> {
1444 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1446 match self {
1447 Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1448 Self::Secondary(db) => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
1449 }
1450 }
1451}
1452
1453impl fmt::Debug for RocksReadSnapshot<'_> {
1454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1455 f.debug_struct("RocksReadSnapshot")
1456 .field("provider", &self.provider)
1457 .finish_non_exhaustive()
1458 }
1459}
1460
1461impl<'db> RocksReadSnapshot<'db> {
1462 fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1464 self.provider.get_cf_handle::<T>()
1465 }
1466
1467 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1469 let encoded_key = key.encode();
1470 let cf = self.cf_handle::<T>()?;
1471 let result = match &self.inner {
1472 RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1473 RocksReadSnapshotInner::Secondary(db) => db.get_cf(cf, encoded_key.as_ref()),
1474 }
1475 .map_err(|e| {
1476 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1477 message: e.to_string().into(),
1478 code: -1,
1479 }))
1480 })?;
1481
1482 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1483 }
1484
1485 pub fn account_history_info(
1490 &self,
1491 address: Address,
1492 block_number: BlockNumber,
1493 lowest_available_block_number: Option<BlockNumber>,
1494 visible_tip: BlockNumber,
1495 ) -> ProviderResult<HistoryInfo> {
1496 let key = ShardedKey::new(address, block_number);
1497 self.history_info::<tables::AccountsHistory>(
1498 key.encode().as_ref(),
1499 block_number,
1500 lowest_available_block_number,
1501 visible_tip,
1502 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1503 |prev_bytes| {
1504 <ShardedKey<Address> as Decode>::decode(prev_bytes)
1505 .map(|k| k.key == address)
1506 .unwrap_or(false)
1507 },
1508 )
1509 }
1510
1511 pub fn storage_history_info(
1516 &self,
1517 address: Address,
1518 storage_key: B256,
1519 block_number: BlockNumber,
1520 lowest_available_block_number: Option<BlockNumber>,
1521 visible_tip: BlockNumber,
1522 ) -> ProviderResult<HistoryInfo> {
1523 let key = StorageShardedKey::new(address, storage_key, block_number);
1524 self.history_info::<tables::StoragesHistory>(
1525 key.encode().as_ref(),
1526 block_number,
1527 lowest_available_block_number,
1528 visible_tip,
1529 |key_bytes| {
1530 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1531 Ok(k.address == address && k.sharded_key.key == storage_key)
1532 },
1533 |prev_bytes| {
1534 <StorageShardedKey as Decode>::decode(prev_bytes)
1535 .map(|k| k.address == address && k.sharded_key.key == storage_key)
1536 .unwrap_or(false)
1537 },
1538 )
1539 }
1540
1541 fn history_info<T>(
1547 &self,
1548 encoded_key: &[u8],
1549 block_number: BlockNumber,
1550 lowest_available_block_number: Option<BlockNumber>,
1551 visible_tip: BlockNumber,
1552 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1553 prev_key_matches: impl Fn(&[u8]) -> bool,
1554 ) -> ProviderResult<HistoryInfo>
1555 where
1556 T: Table<Value = BlockNumberList>,
1557 {
1558 let is_maybe_pruned = lowest_available_block_number.is_some();
1559 let fallback = || {
1560 Ok(if is_maybe_pruned {
1561 HistoryInfo::MaybeInPlainState
1562 } else {
1563 HistoryInfo::NotYetWritten
1564 })
1565 };
1566
1567 let cf = self.cf_handle::<T>()?;
1568 let mut iter = self.inner.raw_iterator_cf(cf);
1569
1570 iter.seek(encoded_key);
1571 iter.status().map_err(|e| {
1572 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1573 message: e.to_string().into(),
1574 code: -1,
1575 }))
1576 })?;
1577
1578 if !iter.valid() {
1579 return fallback();
1580 }
1581
1582 let Some(key_bytes) = iter.key() else {
1583 return fallback();
1584 };
1585 if !key_matches(key_bytes)? {
1586 return fallback();
1587 }
1588
1589 let Some(value_bytes) = iter.value() else {
1590 return fallback();
1591 };
1592 let chunk = BlockNumberList::decompress(value_bytes)?;
1593
1594 let (rank, found_block) = compute_history_rank(&chunk, block_number);
1595 let found_block = found_block.filter(|block| *block <= visible_tip);
1597
1598 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1599 iter.prev();
1600 iter.status().map_err(|e| {
1601 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1602 message: e.to_string().into(),
1603 code: -1,
1604 }))
1605 })?;
1606 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1607
1608 if found_block.is_none() && !has_prev {
1612 return fallback()
1613 }
1614
1615 !has_prev
1616 } else {
1617 false
1618 };
1619
1620 Ok(HistoryInfo::from_lookup(
1621 found_block,
1622 is_before_first_write,
1623 lowest_available_block_number,
1624 ))
1625 }
1626}
1627
1628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1630pub enum PruneShardOutcome {
1631 Deleted,
1633 Updated,
1635 Unchanged,
1637}
1638
1639#[derive(Debug, Default, Clone, Copy)]
1641pub struct PrunedIndices {
1642 pub deleted: usize,
1644 pub updated: usize,
1646 pub unchanged: usize,
1648}
1649
1650#[must_use = "batch must be committed"]
1660pub struct RocksDBBatch<'a> {
1661 provider: &'a RocksDBProvider,
1662 inner: WriteBatchWithTransaction<true>,
1663 buf: Vec<u8>,
1664 auto_commit_threshold: Option<usize>,
1666}
1667
1668impl fmt::Debug for RocksDBBatch<'_> {
1669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1670 f.debug_struct("RocksDBBatch")
1671 .field("provider", &self.provider)
1672 .field("batch", &"<WriteBatchWithTransaction>")
1673 .field("length", &self.inner.len())
1675 .field("size_in_bytes", &self.inner.size_in_bytes())
1678 .finish()
1679 }
1680}
1681
1682impl<'a> RocksDBBatch<'a> {
1683 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1687 let encoded_key = key.encode();
1688 self.put_encoded::<T>(&encoded_key, value)
1689 }
1690
1691 pub fn put_encoded<T: Table>(
1695 &mut self,
1696 key: &<T::Key as Encode>::Encoded,
1697 value: &T::Value,
1698 ) -> ProviderResult<()> {
1699 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1700 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1701 self.maybe_auto_commit()?;
1702 Ok(())
1703 }
1704
1705 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1709 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1710 self.maybe_auto_commit()?;
1711 Ok(())
1712 }
1713
1714 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1719 if let Some(threshold) = self.auto_commit_threshold &&
1720 self.inner.size_in_bytes() >= threshold
1721 {
1722 tracing::debug!(
1723 target: "providers::rocksdb",
1724 batch_size = self.inner.size_in_bytes(),
1725 threshold,
1726 "Auto-committing RocksDB batch"
1727 );
1728 let old_batch = std::mem::take(&mut self.inner);
1729 self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1730 |e| {
1731 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1732 message: e.to_string().into(),
1733 code: -1,
1734 }))
1735 },
1736 )?;
1737 }
1738 Ok(())
1739 }
1740
1741 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1748 pub fn commit(self) -> ProviderResult<()> {
1749 self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1750 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1751 message: e.to_string().into(),
1752 code: -1,
1753 }))
1754 })
1755 }
1756
1757 pub fn len(&self) -> usize {
1759 self.inner.len()
1760 }
1761
1762 pub fn is_empty(&self) -> bool {
1764 self.inner.is_empty()
1765 }
1766
1767 pub fn size_in_bytes(&self) -> usize {
1769 self.inner.size_in_bytes()
1770 }
1771
1772 pub const fn provider(&self) -> &RocksDBProvider {
1774 self.provider
1775 }
1776
1777 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1781 self.inner
1782 }
1783
1784 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1789 self.provider.get::<T>(key)
1790 }
1791
1792 pub fn append_account_history_shard(
1804 &mut self,
1805 address: Address,
1806 indices: impl IntoIterator<Item = u64>,
1807 ) -> ProviderResult<()> {
1808 let indices: Vec<u64> = indices.into_iter().collect();
1809
1810 if indices.is_empty() {
1811 return Ok(());
1812 }
1813
1814 debug_assert!(
1815 indices.windows(2).all(|w| w[0] < w[1]),
1816 "indices must be strictly increasing: {:?}",
1817 indices
1818 );
1819
1820 let last_key = ShardedKey::new(address, u64::MAX);
1821 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1822 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1823
1824 last_shard.append(indices).map_err(ProviderError::other)?;
1825
1826 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1828 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1829 return Ok(());
1830 }
1831
1832 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1834 let mut chunks_peekable = chunks.into_iter().peekable();
1835
1836 while let Some(chunk) = chunks_peekable.next() {
1837 let shard = BlockNumberList::new_pre_sorted(chunk);
1838 let highest_block_number = if chunks_peekable.peek().is_some() {
1839 shard.iter().next_back().expect("`chunks` does not return empty list")
1840 } else {
1841 u64::MAX
1842 };
1843
1844 self.put::<tables::AccountsHistory>(
1845 ShardedKey::new(address, highest_block_number),
1846 &shard,
1847 )?;
1848 }
1849
1850 Ok(())
1851 }
1852
1853 pub fn append_storage_history_shard(
1865 &mut self,
1866 address: Address,
1867 storage_key: B256,
1868 indices: impl IntoIterator<Item = u64>,
1869 ) -> ProviderResult<()> {
1870 let indices: Vec<u64> = indices.into_iter().collect();
1871
1872 if indices.is_empty() {
1873 return Ok(());
1874 }
1875
1876 debug_assert!(
1877 indices.windows(2).all(|w| w[0] < w[1]),
1878 "indices must be strictly increasing: {:?}",
1879 indices
1880 );
1881
1882 let last_key = StorageShardedKey::last(address, storage_key);
1883 let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1884 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1885
1886 last_shard.append(indices).map_err(ProviderError::other)?;
1887
1888 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1890 self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1891 return Ok(());
1892 }
1893
1894 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1896 let mut chunks_peekable = chunks.into_iter().peekable();
1897
1898 while let Some(chunk) = chunks_peekable.next() {
1899 let shard = BlockNumberList::new_pre_sorted(chunk);
1900 let highest_block_number = if chunks_peekable.peek().is_some() {
1901 shard.iter().next_back().expect("`chunks` does not return empty list")
1902 } else {
1903 u64::MAX
1904 };
1905
1906 self.put::<tables::StoragesHistory>(
1907 StorageShardedKey::new(address, storage_key, highest_block_number),
1908 &shard,
1909 )?;
1910 }
1911
1912 Ok(())
1913 }
1914
1915 pub fn unwind_account_history_to(
1922 &mut self,
1923 address: Address,
1924 keep_to: BlockNumber,
1925 ) -> ProviderResult<()> {
1926 let shards = self.provider.account_history_shards(address)?;
1927 if shards.is_empty() {
1928 return Ok(());
1929 }
1930
1931 let boundary_idx = shards.iter().position(|(key, _)| {
1934 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1935 });
1936
1937 let Some(boundary_idx) = boundary_idx else {
1939 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1940 if last_key.highest_block_number != u64::MAX {
1941 self.delete::<tables::AccountsHistory>(last_key.clone())?;
1942 self.put::<tables::AccountsHistory>(
1943 ShardedKey::new(address, u64::MAX),
1944 last_value,
1945 )?;
1946 }
1947 return Ok(());
1948 };
1949
1950 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1952 self.delete::<tables::AccountsHistory>(key.clone())?;
1953 }
1954
1955 let (boundary_key, boundary_list) = &shards[boundary_idx];
1957
1958 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1960
1961 let new_last =
1963 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1964
1965 if new_last.is_empty() {
1966 if boundary_idx == 0 {
1969 return Ok(());
1971 }
1972
1973 let (prev_key, prev_value) = &shards[boundary_idx - 1];
1974 if prev_key.highest_block_number != u64::MAX {
1975 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1976 self.put::<tables::AccountsHistory>(
1977 ShardedKey::new(address, u64::MAX),
1978 prev_value,
1979 )?;
1980 }
1981 return Ok(());
1982 }
1983
1984 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1985
1986 Ok(())
1987 }
1988
1989 #[expect(clippy::too_many_arguments)]
1995 fn prune_history_shards_inner<K>(
1996 &mut self,
1997 shards: Vec<(K, BlockNumberList)>,
1998 to_block: BlockNumber,
1999 get_highest: impl Fn(&K) -> u64,
2000 is_sentinel: impl Fn(&K) -> bool,
2001 delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
2002 put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
2003 create_sentinel: impl Fn() -> K,
2004 ) -> ProviderResult<PruneShardOutcome>
2005 where
2006 K: Clone,
2007 {
2008 if shards.is_empty() {
2009 return Ok(PruneShardOutcome::Unchanged);
2010 }
2011
2012 let mut deleted = false;
2013 let mut updated = false;
2014 let mut last_remaining: Option<(K, BlockNumberList)> = None;
2015
2016 for (key, block_list) in shards {
2017 if !is_sentinel(&key) && get_highest(&key) <= to_block {
2018 delete_shard(self, key)?;
2019 deleted = true;
2020 } else {
2021 let original_len = block_list.len();
2022 let filtered =
2023 BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
2024
2025 if filtered.is_empty() {
2026 delete_shard(self, key)?;
2027 deleted = true;
2028 } else if filtered.len() < original_len {
2029 put_shard(self, key.clone(), &filtered)?;
2030 last_remaining = Some((key, filtered));
2031 updated = true;
2032 } else {
2033 last_remaining = Some((key, block_list));
2034 }
2035 }
2036 }
2037
2038 if let Some((last_key, last_value)) = last_remaining &&
2039 !is_sentinel(&last_key)
2040 {
2041 delete_shard(self, last_key)?;
2042 put_shard(self, create_sentinel(), &last_value)?;
2043 updated = true;
2044 }
2045
2046 if deleted {
2047 Ok(PruneShardOutcome::Deleted)
2048 } else if updated {
2049 Ok(PruneShardOutcome::Updated)
2050 } else {
2051 Ok(PruneShardOutcome::Unchanged)
2052 }
2053 }
2054
2055 pub fn prune_account_history_to(
2060 &mut self,
2061 address: Address,
2062 to_block: BlockNumber,
2063 ) -> ProviderResult<PruneShardOutcome> {
2064 let shards = self.provider.account_history_shards(address)?;
2065 self.prune_history_shards_inner(
2066 shards,
2067 to_block,
2068 |key| key.highest_block_number,
2069 |key| key.highest_block_number == u64::MAX,
2070 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2071 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2072 || ShardedKey::new(address, u64::MAX),
2073 )
2074 }
2075
2076 pub fn prune_account_history_batch(
2085 &mut self,
2086 targets: &[(Address, BlockNumber)],
2087 ) -> ProviderResult<PrunedIndices> {
2088 if targets.is_empty() {
2089 return Ok(PrunedIndices::default());
2090 }
2091
2092 debug_assert!(
2093 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2094 "prune_account_history_batch: targets must be sorted by address"
2095 );
2096
2097 const PREFIX_LEN: usize = 20;
2100
2101 let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2102 let mut iter = self.provider.0.raw_iterator_cf(cf);
2103 let mut outcomes = PrunedIndices::default();
2104
2105 for (address, to_block) in targets {
2106 let start_key = ShardedKey::new(*address, 0u64).encode();
2108 let target_prefix = &start_key[..PREFIX_LEN];
2109
2110 let needs_seek = if iter.valid() {
2116 if let Some(current_key) = iter.key() {
2117 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2121 } else {
2122 true
2123 }
2124 } else {
2125 true
2126 };
2127
2128 if needs_seek {
2129 iter.seek(start_key);
2130 iter.status().map_err(|e| {
2131 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2132 message: e.to_string().into(),
2133 code: -1,
2134 }))
2135 })?;
2136 }
2137
2138 let mut shards = Vec::new();
2140 while iter.valid() {
2141 let Some(key_bytes) = iter.key() else { break };
2142
2143 let current_prefix = key_bytes.get(..PREFIX_LEN);
2145 if current_prefix != Some(target_prefix) {
2146 break;
2147 }
2148
2149 let key = ShardedKey::<Address>::decode(key_bytes)
2151 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2152
2153 let Some(value_bytes) = iter.value() else { break };
2154 let value = BlockNumberList::decompress(value_bytes)
2155 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2156
2157 shards.push((key, value));
2158 iter.next();
2159 }
2160
2161 match self.prune_history_shards_inner(
2162 shards,
2163 *to_block,
2164 |key| key.highest_block_number,
2165 |key| key.highest_block_number == u64::MAX,
2166 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2167 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2168 || ShardedKey::new(*address, u64::MAX),
2169 )? {
2170 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2171 PruneShardOutcome::Updated => outcomes.updated += 1,
2172 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2173 }
2174 }
2175
2176 Ok(outcomes)
2177 }
2178
2179 pub fn prune_storage_history_to(
2185 &mut self,
2186 address: Address,
2187 storage_key: B256,
2188 to_block: BlockNumber,
2189 ) -> ProviderResult<PruneShardOutcome> {
2190 let shards = self.provider.storage_history_shards(address, storage_key)?;
2191 self.prune_history_shards_inner(
2192 shards,
2193 to_block,
2194 |key| key.sharded_key.highest_block_number,
2195 |key| key.sharded_key.highest_block_number == u64::MAX,
2196 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2197 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2198 || StorageShardedKey::last(address, storage_key),
2199 )
2200 }
2201
2202 pub fn prune_storage_history_batch(
2212 &mut self,
2213 targets: &[((Address, B256), BlockNumber)],
2214 ) -> ProviderResult<PrunedIndices> {
2215 if targets.is_empty() {
2216 return Ok(PrunedIndices::default());
2217 }
2218
2219 debug_assert!(
2220 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2221 "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2222 );
2223
2224 const PREFIX_LEN: usize = 52;
2227
2228 let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2229 let mut iter = self.provider.0.raw_iterator_cf(cf);
2230 let mut outcomes = PrunedIndices::default();
2231
2232 for ((address, storage_key), to_block) in targets {
2233 let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2235 let target_prefix = &start_key[..PREFIX_LEN];
2236
2237 let needs_seek = if iter.valid() {
2243 if let Some(current_key) = iter.key() {
2244 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2248 } else {
2249 true
2250 }
2251 } else {
2252 true
2253 };
2254
2255 if needs_seek {
2256 iter.seek(start_key);
2257 iter.status().map_err(|e| {
2258 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2259 message: e.to_string().into(),
2260 code: -1,
2261 }))
2262 })?;
2263 }
2264
2265 let mut shards = Vec::new();
2267 while iter.valid() {
2268 let Some(key_bytes) = iter.key() else { break };
2269
2270 let current_prefix = key_bytes.get(..PREFIX_LEN);
2272 if current_prefix != Some(target_prefix) {
2273 break;
2274 }
2275
2276 let key = StorageShardedKey::decode(key_bytes)
2278 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2279
2280 let Some(value_bytes) = iter.value() else { break };
2281 let value = BlockNumberList::decompress(value_bytes)
2282 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2283
2284 shards.push((key, value));
2285 iter.next();
2286 }
2287
2288 match self.prune_history_shards_inner(
2290 shards,
2291 *to_block,
2292 |key| key.sharded_key.highest_block_number,
2293 |key| key.sharded_key.highest_block_number == u64::MAX,
2294 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2295 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2296 || StorageShardedKey::last(*address, *storage_key),
2297 )? {
2298 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2299 PruneShardOutcome::Updated => outcomes.updated += 1,
2300 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2301 }
2302 }
2303
2304 Ok(outcomes)
2305 }
2306
2307 pub fn unwind_storage_history_to(
2316 &mut self,
2317 address: Address,
2318 storage_key: B256,
2319 keep_to: BlockNumber,
2320 ) -> ProviderResult<()> {
2321 let shards = self.provider.storage_history_shards(address, storage_key)?;
2322 if shards.is_empty() {
2323 return Ok(());
2324 }
2325
2326 let boundary_idx = shards.iter().position(|(key, _)| {
2329 key.sharded_key.highest_block_number == u64::MAX ||
2330 key.sharded_key.highest_block_number > keep_to
2331 });
2332
2333 let Some(boundary_idx) = boundary_idx else {
2335 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2336 if last_key.sharded_key.highest_block_number != u64::MAX {
2337 self.delete::<tables::StoragesHistory>(last_key.clone())?;
2338 self.put::<tables::StoragesHistory>(
2339 StorageShardedKey::last(address, storage_key),
2340 last_value,
2341 )?;
2342 }
2343 return Ok(());
2344 };
2345
2346 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2348 self.delete::<tables::StoragesHistory>(key.clone())?;
2349 }
2350
2351 let (boundary_key, boundary_list) = &shards[boundary_idx];
2353
2354 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2356
2357 let new_last =
2359 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2360
2361 if new_last.is_empty() {
2362 if boundary_idx == 0 {
2365 return Ok(());
2367 }
2368
2369 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2370 if prev_key.sharded_key.highest_block_number != u64::MAX {
2371 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2372 self.put::<tables::StoragesHistory>(
2373 StorageShardedKey::last(address, storage_key),
2374 prev_value,
2375 )?;
2376 }
2377 return Ok(());
2378 }
2379
2380 self.put::<tables::StoragesHistory>(
2381 StorageShardedKey::last(address, storage_key),
2382 &new_last,
2383 )?;
2384
2385 Ok(())
2386 }
2387
2388 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2392 let shards = self.provider.account_history_shards(address)?;
2393 for (key, _) in shards {
2394 self.delete::<tables::AccountsHistory>(key)?;
2395 }
2396 Ok(())
2397 }
2398
2399 pub fn clear_storage_history(
2403 &mut self,
2404 address: Address,
2405 storage_key: B256,
2406 ) -> ProviderResult<()> {
2407 let shards = self.provider.storage_history_shards(address, storage_key)?;
2408 for (key, _) in shards {
2409 self.delete::<tables::StoragesHistory>(key)?;
2410 }
2411 Ok(())
2412 }
2413}
2414
2415pub struct RocksTx<'db> {
2425 inner: Transaction<'db, OptimisticTransactionDB>,
2426 provider: &'db RocksDBProvider,
2427}
2428
2429impl fmt::Debug for RocksTx<'_> {
2430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2431 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2432 }
2433}
2434
2435impl<'db> RocksTx<'db> {
2436 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2438 let encoded_key = key.encode();
2439 self.get_encoded::<T>(&encoded_key)
2440 }
2441
2442 pub fn get_encoded<T: Table>(
2444 &self,
2445 key: &<T::Key as Encode>::Encoded,
2446 ) -> ProviderResult<Option<T::Value>> {
2447 let cf = self.provider.get_cf_handle::<T>()?;
2448 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2449 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2450 message: e.to_string().into(),
2451 code: -1,
2452 }))
2453 })?;
2454
2455 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2456 }
2457
2458 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2460 let encoded_key = key.encode();
2461 self.put_encoded::<T>(&encoded_key, value)
2462 }
2463
2464 pub fn put_encoded<T: Table>(
2466 &self,
2467 key: &<T::Key as Encode>::Encoded,
2468 value: &T::Value,
2469 ) -> ProviderResult<()> {
2470 let cf = self.provider.get_cf_handle::<T>()?;
2471 let mut buf = Vec::new();
2472 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2473
2474 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2475 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2476 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2477 operation: DatabaseWriteOperation::PutUpsert,
2478 table_name: T::NAME,
2479 key: key.as_ref().to_vec(),
2480 })))
2481 })
2482 }
2483
2484 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2486 let cf = self.provider.get_cf_handle::<T>()?;
2487 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2488 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2489 message: e.to_string().into(),
2490 code: -1,
2491 }))
2492 })
2493 }
2494
2495 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2499 let cf = self.provider.get_cf_handle::<T>()?;
2500 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2501 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2502 }
2503
2504 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2506 let cf = self.provider.get_cf_handle::<T>()?;
2507 let encoded_key = key.encode();
2508 let iter = self
2509 .inner
2510 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2511 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2512 }
2513
2514 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2516 pub fn commit(self) -> ProviderResult<()> {
2517 self.inner.commit().map_err(|e| {
2518 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2519 message: e.to_string().into(),
2520 code: -1,
2521 }))
2522 })
2523 }
2524
2525 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2527 pub fn rollback(self) -> ProviderResult<()> {
2528 self.inner.rollback().map_err(|e| {
2529 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2530 })
2531 }
2532}
2533
2534enum RocksDBIterEnum<'db> {
2536 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2538 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2540}
2541
2542impl Iterator for RocksDBIterEnum<'_> {
2543 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2544
2545 fn next(&mut self) -> Option<Self::Item> {
2546 match self {
2547 Self::ReadWrite(iter) => iter.next(),
2548 Self::ReadOnly(iter) => iter.next(),
2549 }
2550 }
2551}
2552
2553enum RocksDBRawIterEnum<'db> {
2558 ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2560 ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2562}
2563
2564impl RocksDBRawIterEnum<'_> {
2565 fn seek(&mut self, key: impl AsRef<[u8]>) {
2567 match self {
2568 Self::ReadWrite(iter) => iter.seek(key),
2569 Self::ReadOnly(iter) => iter.seek(key),
2570 }
2571 }
2572
2573 fn valid(&self) -> bool {
2575 match self {
2576 Self::ReadWrite(iter) => iter.valid(),
2577 Self::ReadOnly(iter) => iter.valid(),
2578 }
2579 }
2580
2581 fn key(&self) -> Option<&[u8]> {
2583 match self {
2584 Self::ReadWrite(iter) => iter.key(),
2585 Self::ReadOnly(iter) => iter.key(),
2586 }
2587 }
2588
2589 fn value(&self) -> Option<&[u8]> {
2591 match self {
2592 Self::ReadWrite(iter) => iter.value(),
2593 Self::ReadOnly(iter) => iter.value(),
2594 }
2595 }
2596
2597 fn next(&mut self) {
2599 match self {
2600 Self::ReadWrite(iter) => iter.next(),
2601 Self::ReadOnly(iter) => iter.next(),
2602 }
2603 }
2604
2605 fn prev(&mut self) {
2607 match self {
2608 Self::ReadWrite(iter) => iter.prev(),
2609 Self::ReadOnly(iter) => iter.prev(),
2610 }
2611 }
2612
2613 fn status(&self) -> Result<(), rocksdb::Error> {
2615 match self {
2616 Self::ReadWrite(iter) => iter.status(),
2617 Self::ReadOnly(iter) => iter.status(),
2618 }
2619 }
2620}
2621
2622pub struct RocksDBIter<'db, T: Table> {
2626 inner: RocksDBIterEnum<'db>,
2627 _marker: std::marker::PhantomData<T>,
2628}
2629
2630impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2632 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2633 }
2634}
2635
2636impl<T: Table> Iterator for RocksDBIter<'_, T> {
2637 type Item = ProviderResult<(T::Key, T::Value)>;
2638
2639 fn next(&mut self) -> Option<Self::Item> {
2640 Some(decode_iter_item::<T>(self.inner.next()?))
2641 }
2642}
2643
2644pub struct RocksDBRawIter<'db> {
2648 inner: RocksDBIterEnum<'db>,
2649}
2650
2651impl fmt::Debug for RocksDBRawIter<'_> {
2652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2653 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2654 }
2655}
2656
2657impl Iterator for RocksDBRawIter<'_> {
2658 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2659
2660 fn next(&mut self) -> Option<Self::Item> {
2661 match self.inner.next()? {
2662 Ok(kv) => Some(Ok(kv)),
2663 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2664 message: e.to_string().into(),
2665 code: -1,
2666 })))),
2667 }
2668 }
2669}
2670
2671pub struct RocksTxIter<'tx, T: Table> {
2675 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2676 _marker: std::marker::PhantomData<T>,
2677}
2678
2679impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2680 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2681 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2682 }
2683}
2684
2685impl<T: Table> Iterator for RocksTxIter<'_, T> {
2686 type Item = ProviderResult<(T::Key, T::Value)>;
2687
2688 fn next(&mut self) -> Option<Self::Item> {
2689 Some(decode_iter_item::<T>(self.inner.next()?))
2690 }
2691}
2692
2693fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2698 let (key_bytes, value_bytes) = result.map_err(|e| {
2699 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2700 message: e.to_string().into(),
2701 code: -1,
2702 }))
2703 })?;
2704
2705 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2706 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2707
2708 let value = T::Value::decompress(&value_bytes)
2709 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2710
2711 Ok((key, value))
2712}
2713
2714const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2716 match level {
2717 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2718 LogLevel::Error => rocksdb::LogLevel::Error,
2719 LogLevel::Warn => rocksdb::LogLevel::Warn,
2720 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2721 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2722 }
2723}
2724
2725#[cfg(test)]
2726mod tests {
2727 use super::*;
2728 use crate::providers::HistoryInfo;
2729 use alloy_primitives::{Address, TxHash, B256};
2730 use reth_db_api::{
2731 models::{
2732 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2733 storage_sharded_key::StorageShardedKey,
2734 IntegerList,
2735 },
2736 table::Table,
2737 tables,
2738 };
2739 use tempfile::TempDir;
2740
2741 #[test]
2742 fn test_with_default_tables_registers_required_column_families() {
2743 let temp_dir = TempDir::new().unwrap();
2744
2745 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2747
2748 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2750 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2751 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2752
2753 let key = ShardedKey::new(Address::ZERO, 100);
2755 let value = IntegerList::default();
2756 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2757 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2758
2759 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2761 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2762 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2763 }
2764
2765 #[derive(Debug)]
2766 struct TestTable;
2767
2768 impl Table for TestTable {
2769 const NAME: &'static str = "TestTable";
2770 const DUPSORT: bool = false;
2771 type Key = u64;
2772 type Value = Vec<u8>;
2773 }
2774
2775 #[test]
2776 fn test_basic_operations() {
2777 let temp_dir = TempDir::new().unwrap();
2778
2779 let provider = RocksDBBuilder::new(temp_dir.path())
2780 .with_table::<TestTable>() .build()
2782 .unwrap();
2783
2784 let key = 42u64;
2785 let value = b"test_value".to_vec();
2786
2787 provider.put::<TestTable>(key, &value).unwrap();
2789
2790 let result = provider.get::<TestTable>(key).unwrap();
2792 assert_eq!(result, Some(value));
2793
2794 provider.delete::<TestTable>(key).unwrap();
2796
2797 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2799 }
2800
2801 #[test]
2802 fn test_batch_operations() {
2803 let temp_dir = TempDir::new().unwrap();
2804 let provider =
2805 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2806
2807 provider
2809 .write_batch(|batch| {
2810 for i in 0..10u64 {
2811 let value = format!("value_{i}").into_bytes();
2812 batch.put::<TestTable>(i, &value)?;
2813 }
2814 Ok(())
2815 })
2816 .unwrap();
2817
2818 for i in 0..10u64 {
2820 let value = format!("value_{i}").into_bytes();
2821 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2822 }
2823
2824 provider
2826 .write_batch(|batch| {
2827 for i in 0..10u64 {
2828 batch.delete::<TestTable>(i)?;
2829 }
2830 Ok(())
2831 })
2832 .unwrap();
2833
2834 for i in 0..10u64 {
2836 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2837 }
2838 }
2839
2840 #[test]
2841 fn test_with_real_table() {
2842 let temp_dir = TempDir::new().unwrap();
2843 let provider = RocksDBBuilder::new(temp_dir.path())
2844 .with_table::<tables::TransactionHashNumbers>()
2845 .with_metrics()
2846 .build()
2847 .unwrap();
2848
2849 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2850
2851 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2853 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2854
2855 provider
2857 .write_batch(|batch| {
2858 for i in 0..10u64 {
2859 let hash = TxHash::from(B256::from([i as u8; 32]));
2860 let value = i * 100;
2861 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2862 }
2863 Ok(())
2864 })
2865 .unwrap();
2866
2867 for i in 0..10u64 {
2869 let hash = TxHash::from(B256::from([i as u8; 32]));
2870 assert_eq!(
2871 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2872 Some(i * 100)
2873 );
2874 }
2875 }
2876 #[test]
2877 fn test_statistics_enabled() {
2878 let temp_dir = TempDir::new().unwrap();
2879 let provider = RocksDBBuilder::new(temp_dir.path())
2881 .with_table::<TestTable>()
2882 .with_statistics()
2883 .build()
2884 .unwrap();
2885
2886 for i in 0..10 {
2888 let value = vec![i as u8];
2889 provider.put::<TestTable>(i, &value).unwrap();
2890 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2892 }
2893 }
2894
2895 #[test]
2896 fn test_data_persistence() {
2897 let temp_dir = TempDir::new().unwrap();
2898 let provider =
2899 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2900
2901 let value = vec![42u8; 1000];
2903 for i in 0..100 {
2904 provider.put::<TestTable>(i, &value).unwrap();
2905 }
2906
2907 for i in 0..100 {
2909 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2910 }
2911 }
2912
2913 #[test]
2914 fn test_transaction_read_your_writes() {
2915 let temp_dir = TempDir::new().unwrap();
2916 let provider =
2917 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2918
2919 let tx = provider.tx();
2921
2922 let key = 42u64;
2924 let value = b"test_value".to_vec();
2925 tx.put::<TestTable>(key, &value).unwrap();
2926
2927 let result = tx.get::<TestTable>(key).unwrap();
2929 assert_eq!(
2930 result,
2931 Some(value.clone()),
2932 "Transaction should see its own uncommitted writes"
2933 );
2934
2935 let provider_result = provider.get::<TestTable>(key).unwrap();
2937 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2938
2939 tx.commit().unwrap();
2941
2942 let committed_result = provider.get::<TestTable>(key).unwrap();
2944 assert_eq!(committed_result, Some(value), "Committed data should be visible");
2945 }
2946
2947 #[test]
2948 fn test_transaction_rollback() {
2949 let temp_dir = TempDir::new().unwrap();
2950 let provider =
2951 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2952
2953 let key = 100u64;
2955 let initial_value = b"initial".to_vec();
2956 provider.put::<TestTable>(key, &initial_value).unwrap();
2957
2958 let tx = provider.tx();
2960 let new_value = b"modified".to_vec();
2961 tx.put::<TestTable>(key, &new_value).unwrap();
2962
2963 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2965
2966 tx.rollback().unwrap();
2968
2969 let result = provider.get::<TestTable>(key).unwrap();
2971 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2972 }
2973
2974 #[test]
2975 fn test_transaction_iterator() {
2976 let temp_dir = TempDir::new().unwrap();
2977 let provider =
2978 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2979
2980 let tx = provider.tx();
2982
2983 for i in 0..5u64 {
2985 let value = format!("value_{i}").into_bytes();
2986 tx.put::<TestTable>(i, &value).unwrap();
2987 }
2988
2989 let mut count = 0;
2991 for result in tx.iter::<TestTable>().unwrap() {
2992 let (key, value) = result.unwrap();
2993 assert_eq!(value, format!("value_{key}").into_bytes());
2994 count += 1;
2995 }
2996 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2997
2998 tx.commit().unwrap();
3000 }
3001
3002 #[test]
3003 fn test_batch_manual_commit() {
3004 let temp_dir = TempDir::new().unwrap();
3005 let provider =
3006 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3007
3008 let mut batch = provider.batch();
3010
3011 for i in 0..10u64 {
3013 let value = format!("batch_value_{i}").into_bytes();
3014 batch.put::<TestTable>(i, &value).unwrap();
3015 }
3016
3017 assert_eq!(batch.len(), 10);
3019 assert!(!batch.is_empty());
3020
3021 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
3023
3024 batch.commit().unwrap();
3026
3027 for i in 0..10u64 {
3029 let value = format!("batch_value_{i}").into_bytes();
3030 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3031 }
3032 }
3033
3034 #[test]
3035 fn test_first_and_last_entry() {
3036 let temp_dir = TempDir::new().unwrap();
3037 let provider =
3038 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3039
3040 assert_eq!(provider.first::<TestTable>().unwrap(), None);
3042 assert_eq!(provider.last::<TestTable>().unwrap(), None);
3043
3044 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
3046 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
3047 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
3048
3049 let first = provider.first::<TestTable>().unwrap();
3051 assert_eq!(first, Some((5, b"value_5".to_vec())));
3052
3053 let last = provider.last::<TestTable>().unwrap();
3055 assert_eq!(last, Some((20, b"value_20".to_vec())));
3056 }
3057
3058 #[test]
3062 fn test_account_history_info_pruned_before_first_entry() {
3063 let temp_dir = TempDir::new().unwrap();
3064 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3065
3066 let address = Address::from([0x42; 20]);
3067
3068 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3070 let shard_key = ShardedKey::new(address, u64::MAX);
3071 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3072
3073 let result =
3078 provider.snapshot().account_history_info(address, 50, Some(100), u64::MAX).unwrap();
3079 assert_eq!(result, HistoryInfo::InChangeset(100));
3080 }
3081
3082 #[test]
3084 fn test_account_history_info_read_only_and_catch_up() {
3085 let temp_dir = TempDir::new().unwrap();
3086 let address = Address::from([0x42; 20]);
3087 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3088 let shard_key = ShardedKey::new(address, u64::MAX);
3089
3090 let rw_provider =
3092 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3093 rw_provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3094
3095 let ro_provider = RocksDBBuilder::new(temp_dir.path())
3097 .with_default_tables()
3098 .with_read_only(true)
3099 .build()
3100 .unwrap();
3101
3102 let result =
3103 ro_provider.snapshot().account_history_info(address, 200, None, u64::MAX).unwrap();
3104 assert_eq!(result, HistoryInfo::InChangeset(200));
3105
3106 let result =
3107 ro_provider.snapshot().account_history_info(address, 50, None, u64::MAX).unwrap();
3108 assert_eq!(result, HistoryInfo::NotYetWritten);
3109
3110 let result =
3111 ro_provider.snapshot().account_history_info(address, 400, None, u64::MAX).unwrap();
3112 assert_eq!(result, HistoryInfo::InPlainState);
3113
3114 let address2 = Address::from([0x43; 20]);
3116 let chunk2 = IntegerList::new([500, 600]).unwrap();
3117 let shard_key2 = ShardedKey::new(address2, u64::MAX);
3118 rw_provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
3119
3120 let result =
3122 ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3123 assert_eq!(result, HistoryInfo::NotYetWritten);
3124
3125 ro_provider.try_catch_up_with_primary().unwrap();
3127
3128 let result =
3129 ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3130 assert_eq!(result, HistoryInfo::InChangeset(500));
3131 }
3132
3133 #[test]
3134 fn test_account_history_info_ignores_blocks_above_visible_tip() {
3135 let temp_dir = TempDir::new().unwrap();
3136 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3137
3138 let address = Address::from([0x42; 20]);
3139
3140 provider
3141 .put::<tables::AccountsHistory>(
3142 ShardedKey::new(address, 110),
3143 &IntegerList::new([100, 110]).unwrap(),
3144 )
3145 .unwrap();
3146 provider
3147 .put::<tables::AccountsHistory>(
3148 ShardedKey::new(address, u64::MAX),
3149 &IntegerList::new([200, 210]).unwrap(),
3150 )
3151 .unwrap();
3152
3153 let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3154 assert_eq!(result, HistoryInfo::InPlainState);
3155 }
3156
3157 #[test]
3158 fn test_account_history_info_mixed_shard_respects_visible_tip() {
3159 let temp_dir = TempDir::new().unwrap();
3160 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3161
3162 let address = Address::from([0x42; 20]);
3163 provider
3164 .put::<tables::AccountsHistory>(
3165 ShardedKey::new(address, u64::MAX),
3166 &IntegerList::new([100, 150, 300]).unwrap(),
3167 )
3168 .unwrap();
3169
3170 let result = provider.snapshot().account_history_info(address, 120, None, 200).unwrap();
3171 assert_eq!(result, HistoryInfo::InChangeset(150));
3172
3173 let result = provider.snapshot().account_history_info(address, 201, None, 200).unwrap();
3174 assert_eq!(result, HistoryInfo::InPlainState);
3175 }
3176
3177 #[test]
3178 fn test_account_history_info_only_stale_entries_use_fallback() {
3179 let temp_dir = TempDir::new().unwrap();
3180 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3181
3182 let address = Address::from([0x42; 20]);
3183 provider
3184 .put::<tables::AccountsHistory>(
3185 ShardedKey::new(address, u64::MAX),
3186 &IntegerList::new([200, 210]).unwrap(),
3187 )
3188 .unwrap();
3189
3190 let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3191 assert_eq!(result, HistoryInfo::NotYetWritten);
3192
3193 let result =
3194 provider.snapshot().account_history_info(address, 150, Some(100), 150).unwrap();
3195 assert_eq!(result, HistoryInfo::MaybeInPlainState);
3196 }
3197
3198 #[test]
3199 fn test_account_history_shard_split_at_boundary() {
3200 let temp_dir = TempDir::new().unwrap();
3201 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3202
3203 let address = Address::from([0x42; 20]);
3204 let limit = NUM_OF_INDICES_IN_SHARD;
3205
3206 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3208 let mut batch = provider.batch();
3209 batch.append_account_history_shard(address, indices).unwrap();
3210 batch.commit().unwrap();
3211
3212 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3214 let sentinel_key = ShardedKey::new(address, u64::MAX);
3215
3216 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3217 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3218
3219 assert!(completed_shard.is_some(), "completed shard should exist");
3220 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3221
3222 let completed_shard = completed_shard.unwrap();
3223 let sentinel_shard = sentinel_shard.unwrap();
3224
3225 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3226 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3227 }
3228
3229 #[test]
3230 fn test_account_history_multiple_shard_splits() {
3231 let temp_dir = TempDir::new().unwrap();
3232 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3233
3234 let address = Address::from([0x43; 20]);
3235 let limit = NUM_OF_INDICES_IN_SHARD;
3236
3237 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3239 let mut batch = provider.batch();
3240 batch.append_account_history_shard(address, first_batch_indices).unwrap();
3241 batch.commit().unwrap();
3242
3243 let sentinel_key = ShardedKey::new(address, u64::MAX);
3245 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3246 assert!(shard.is_some());
3247 assert_eq!(shard.unwrap().len(), limit as u64);
3248
3249 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3251 let mut batch = provider.batch();
3252 batch.append_account_history_shard(address, second_batch_indices).unwrap();
3253 batch.commit().unwrap();
3254
3255 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3257 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3258
3259 assert!(
3260 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3261 "first completed shard should exist"
3262 );
3263 assert!(
3264 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3265 "second completed shard should exist"
3266 );
3267 assert!(
3268 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3269 "sentinel shard should exist"
3270 );
3271 }
3272
3273 #[test]
3274 fn test_storage_history_shard_split_at_boundary() {
3275 let temp_dir = TempDir::new().unwrap();
3276 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3277
3278 let address = Address::from([0x44; 20]);
3279 let slot = B256::from([0x55; 32]);
3280 let limit = NUM_OF_INDICES_IN_SHARD;
3281
3282 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3284 let mut batch = provider.batch();
3285 batch.append_storage_history_shard(address, slot, indices).unwrap();
3286 batch.commit().unwrap();
3287
3288 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3290 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3291
3292 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3293 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3294
3295 assert!(completed_shard.is_some(), "completed shard should exist");
3296 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3297
3298 let completed_shard = completed_shard.unwrap();
3299 let sentinel_shard = sentinel_shard.unwrap();
3300
3301 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3302 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3303 }
3304
3305 #[test]
3306 fn test_storage_history_multiple_shard_splits() {
3307 let temp_dir = TempDir::new().unwrap();
3308 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3309
3310 let address = Address::from([0x46; 20]);
3311 let slot = B256::from([0x57; 32]);
3312 let limit = NUM_OF_INDICES_IN_SHARD;
3313
3314 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3316 let mut batch = provider.batch();
3317 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3318 batch.commit().unwrap();
3319
3320 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3322 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3323 assert!(shard.is_some());
3324 assert_eq!(shard.unwrap().len(), limit as u64);
3325
3326 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3328 let mut batch = provider.batch();
3329 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3330 batch.commit().unwrap();
3331
3332 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3334 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3335
3336 assert!(
3337 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3338 "first completed shard should exist"
3339 );
3340 assert!(
3341 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3342 "second completed shard should exist"
3343 );
3344 assert!(
3345 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3346 "sentinel shard should exist"
3347 );
3348 }
3349
3350 #[test]
3351 fn test_clear_table() {
3352 let temp_dir = TempDir::new().unwrap();
3353 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3354
3355 let address = Address::from([0x42; 20]);
3356 let key = ShardedKey::new(address, u64::MAX);
3357 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3358
3359 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3360 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3361
3362 provider.clear::<tables::AccountsHistory>().unwrap();
3363
3364 assert!(
3365 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3366 "table should be empty after clear"
3367 );
3368 assert!(
3369 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3370 "first() should return None after clear"
3371 );
3372 }
3373
3374 #[test]
3375 fn test_clear_empty_table() {
3376 let temp_dir = TempDir::new().unwrap();
3377 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3378
3379 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3380
3381 provider.clear::<tables::AccountsHistory>().unwrap();
3382
3383 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3384 }
3385
3386 #[test]
3387 fn test_unwind_account_history_to_basic() {
3388 let temp_dir = TempDir::new().unwrap();
3389 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3390
3391 let address = Address::from([0x42; 20]);
3392
3393 let mut batch = provider.batch();
3395 batch.append_account_history_shard(address, 0..=10).unwrap();
3396 batch.commit().unwrap();
3397
3398 let key = ShardedKey::new(address, u64::MAX);
3400 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3401 assert!(result.is_some());
3402 let blocks: Vec<u64> = result.unwrap().iter().collect();
3403 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3404
3405 let mut batch = provider.batch();
3407 batch.unwind_account_history_to(address, 5).unwrap();
3408 batch.commit().unwrap();
3409
3410 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3412 assert!(result.is_some());
3413 let blocks: Vec<u64> = result.unwrap().iter().collect();
3414 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3415 }
3416
3417 #[test]
3418 fn test_unwind_account_history_to_removes_all() {
3419 let temp_dir = TempDir::new().unwrap();
3420 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3421
3422 let address = Address::from([0x42; 20]);
3423
3424 let mut batch = provider.batch();
3426 batch.append_account_history_shard(address, 5..=10).unwrap();
3427 batch.commit().unwrap();
3428
3429 let mut batch = provider.batch();
3431 batch.unwind_account_history_to(address, 4).unwrap();
3432 batch.commit().unwrap();
3433
3434 let key = ShardedKey::new(address, u64::MAX);
3436 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3437 assert!(result.is_none(), "Should have no data after full unwind");
3438 }
3439
3440 #[test]
3441 fn test_unwind_account_history_to_no_op() {
3442 let temp_dir = TempDir::new().unwrap();
3443 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3444
3445 let address = Address::from([0x42; 20]);
3446
3447 let mut batch = provider.batch();
3449 batch.append_account_history_shard(address, 0..=5).unwrap();
3450 batch.commit().unwrap();
3451
3452 let mut batch = provider.batch();
3454 batch.unwind_account_history_to(address, 10).unwrap();
3455 batch.commit().unwrap();
3456
3457 let key = ShardedKey::new(address, u64::MAX);
3459 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3460 assert!(result.is_some());
3461 let blocks: Vec<u64> = result.unwrap().iter().collect();
3462 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3463 }
3464
3465 #[test]
3466 fn test_unwind_account_history_to_block_zero() {
3467 let temp_dir = TempDir::new().unwrap();
3468 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3469
3470 let address = Address::from([0x42; 20]);
3471
3472 let mut batch = provider.batch();
3474 batch.append_account_history_shard(address, 0..=5).unwrap();
3475 batch.commit().unwrap();
3476
3477 let mut batch = provider.batch();
3480 batch.unwind_account_history_to(address, 0).unwrap();
3481 batch.commit().unwrap();
3482
3483 let key = ShardedKey::new(address, u64::MAX);
3485 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3486 assert!(result.is_some());
3487 let blocks: Vec<u64> = result.unwrap().iter().collect();
3488 assert_eq!(blocks, vec![0]);
3489 }
3490
3491 #[test]
3492 fn test_unwind_account_history_to_multi_shard() {
3493 let temp_dir = TempDir::new().unwrap();
3494 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3495
3496 let address = Address::from([0x42; 20]);
3497
3498 let mut batch = provider.batch();
3501
3502 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3504 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3505
3506 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3508 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3509
3510 batch.commit().unwrap();
3511
3512 let shards = provider.account_history_shards(address).unwrap();
3514 assert_eq!(shards.len(), 2);
3515
3516 let mut batch = provider.batch();
3518 batch.unwind_account_history_to(address, 75).unwrap();
3519 batch.commit().unwrap();
3520
3521 let shards = provider.account_history_shards(address).unwrap();
3523 assert_eq!(shards.len(), 2);
3524
3525 assert_eq!(shards[0].0.highest_block_number, 50);
3527 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3528
3529 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3531 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3532 }
3533
3534 #[test]
3535 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3536 let temp_dir = TempDir::new().unwrap();
3537 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3538
3539 let address = Address::from([0x42; 20]);
3540
3541 let mut batch = provider.batch();
3543
3544 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3546 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3547
3548 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3550 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3551
3552 batch.commit().unwrap();
3553
3554 let mut batch = provider.batch();
3556 batch.unwind_account_history_to(address, 60).unwrap();
3557 batch.commit().unwrap();
3558
3559 let shards = provider.account_history_shards(address).unwrap();
3561 assert_eq!(shards.len(), 1);
3562 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3563 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3564 }
3565
3566 #[test]
3567 fn test_account_history_shards_iterator() {
3568 let temp_dir = TempDir::new().unwrap();
3569 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3570
3571 let address = Address::from([0x42; 20]);
3572 let other_address = Address::from([0x43; 20]);
3573
3574 let mut batch = provider.batch();
3576 batch.append_account_history_shard(address, 0..=5).unwrap();
3577 batch.append_account_history_shard(other_address, 10..=15).unwrap();
3578 batch.commit().unwrap();
3579
3580 let shards = provider.account_history_shards(address).unwrap();
3582 assert_eq!(shards.len(), 1);
3583 assert_eq!(shards[0].0.key, address);
3584
3585 let shards = provider.account_history_shards(other_address).unwrap();
3587 assert_eq!(shards.len(), 1);
3588 assert_eq!(shards[0].0.key, other_address);
3589
3590 let non_existent = Address::from([0x99; 20]);
3592 let shards = provider.account_history_shards(non_existent).unwrap();
3593 assert!(shards.is_empty());
3594 }
3595
3596 #[test]
3597 fn test_clear_account_history() {
3598 let temp_dir = TempDir::new().unwrap();
3599 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3600
3601 let address = Address::from([0x42; 20]);
3602
3603 let mut batch = provider.batch();
3605 batch.append_account_history_shard(address, 0..=10).unwrap();
3606 batch.commit().unwrap();
3607
3608 let mut batch = provider.batch();
3610 batch.clear_account_history(address).unwrap();
3611 batch.commit().unwrap();
3612
3613 let shards = provider.account_history_shards(address).unwrap();
3615 assert!(shards.is_empty(), "All shards should be deleted");
3616 }
3617
3618 #[test]
3619 fn test_unwind_non_sentinel_boundary() {
3620 let temp_dir = TempDir::new().unwrap();
3621 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3622
3623 let address = Address::from([0x42; 20]);
3624
3625 let mut batch = provider.batch();
3627
3628 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3630 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3631
3632 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3634 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3635
3636 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3638 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3639
3640 batch.commit().unwrap();
3641
3642 let shards = provider.account_history_shards(address).unwrap();
3644 assert_eq!(shards.len(), 3);
3645
3646 let mut batch = provider.batch();
3648 batch.unwind_account_history_to(address, 75).unwrap();
3649 batch.commit().unwrap();
3650
3651 let shards = provider.account_history_shards(address).unwrap();
3653 assert_eq!(shards.len(), 2);
3654
3655 assert_eq!(shards[0].0.highest_block_number, 50);
3657 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3658
3659 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3661 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3662 }
3663
3664 #[test]
3665 fn test_batch_auto_commit_on_threshold() {
3666 let temp_dir = TempDir::new().unwrap();
3667 let provider =
3668 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3669
3670 let mut batch = RocksDBBatch {
3672 provider: &provider,
3673 inner: WriteBatchWithTransaction::<true>::default(),
3674 buf: Vec::new(),
3675 auto_commit_threshold: Some(1024), };
3677
3678 for i in 0..100u64 {
3681 let value = format!("value_{i:04}").into_bytes();
3682 batch.put::<TestTable>(i, &value).unwrap();
3683 }
3684
3685 let first_visible = provider.get::<TestTable>(0).unwrap();
3688 assert!(first_visible.is_some(), "Auto-committed data should be visible");
3689
3690 batch.commit().unwrap();
3692
3693 for i in 0..100u64 {
3695 let value = format!("value_{i:04}").into_bytes();
3696 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3697 }
3698 }
3699
3700 struct AccountPruneCase {
3704 name: &'static str,
3705 initial_shards: &'static [(u64, &'static [u64])],
3706 prune_to: u64,
3707 expected_outcome: PruneShardOutcome,
3708 expected_shards: &'static [(u64, &'static [u64])],
3709 }
3710
3711 struct StoragePruneCase {
3713 name: &'static str,
3714 initial_shards: &'static [(u64, &'static [u64])],
3715 prune_to: u64,
3716 expected_outcome: PruneShardOutcome,
3717 expected_shards: &'static [(u64, &'static [u64])],
3718 }
3719
3720 #[test]
3721 fn test_prune_account_history_cases() {
3722 const MAX: u64 = u64::MAX;
3723 const CASES: &[AccountPruneCase] = &[
3724 AccountPruneCase {
3725 name: "single_shard_truncate",
3726 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3727 prune_to: 25,
3728 expected_outcome: PruneShardOutcome::Updated,
3729 expected_shards: &[(MAX, &[30, 40])],
3730 },
3731 AccountPruneCase {
3732 name: "single_shard_delete_all",
3733 initial_shards: &[(MAX, &[10, 20])],
3734 prune_to: 20,
3735 expected_outcome: PruneShardOutcome::Deleted,
3736 expected_shards: &[],
3737 },
3738 AccountPruneCase {
3739 name: "single_shard_noop",
3740 initial_shards: &[(MAX, &[10, 20])],
3741 prune_to: 5,
3742 expected_outcome: PruneShardOutcome::Unchanged,
3743 expected_shards: &[(MAX, &[10, 20])],
3744 },
3745 AccountPruneCase {
3746 name: "no_shards",
3747 initial_shards: &[],
3748 prune_to: 100,
3749 expected_outcome: PruneShardOutcome::Unchanged,
3750 expected_shards: &[],
3751 },
3752 AccountPruneCase {
3753 name: "multi_shard_truncate_first",
3754 initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3755 prune_to: 25,
3756 expected_outcome: PruneShardOutcome::Updated,
3757 expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3758 },
3759 AccountPruneCase {
3760 name: "delete_first_shard_sentinel_unchanged",
3761 initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3762 prune_to: 20,
3763 expected_outcome: PruneShardOutcome::Deleted,
3764 expected_shards: &[(MAX, &[30, 40])],
3765 },
3766 AccountPruneCase {
3767 name: "multi_shard_delete_all_but_last",
3768 initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3769 prune_to: 22,
3770 expected_outcome: PruneShardOutcome::Deleted,
3771 expected_shards: &[(MAX, &[25, 30])],
3772 },
3773 AccountPruneCase {
3774 name: "mid_shard_preserves_key",
3775 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3776 prune_to: 25,
3777 expected_outcome: PruneShardOutcome::Updated,
3778 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3779 },
3780 AccountPruneCase {
3782 name: "equiv_delete_early_shards_keep_sentinel",
3783 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3784 prune_to: 55,
3785 expected_outcome: PruneShardOutcome::Deleted,
3786 expected_shards: &[(MAX, &[60, 70])],
3787 },
3788 AccountPruneCase {
3789 name: "equiv_sentinel_becomes_empty_with_prev",
3790 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3791 prune_to: 40,
3792 expected_outcome: PruneShardOutcome::Deleted,
3793 expected_shards: &[(MAX, &[50])],
3794 },
3795 AccountPruneCase {
3796 name: "equiv_all_shards_become_empty",
3797 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3798 prune_to: 51,
3799 expected_outcome: PruneShardOutcome::Deleted,
3800 expected_shards: &[],
3801 },
3802 AccountPruneCase {
3803 name: "equiv_non_sentinel_last_shard_promoted",
3804 initial_shards: &[(100, &[50, 75, 100])],
3805 prune_to: 60,
3806 expected_outcome: PruneShardOutcome::Updated,
3807 expected_shards: &[(MAX, &[75, 100])],
3808 },
3809 AccountPruneCase {
3810 name: "equiv_filter_within_shard",
3811 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3812 prune_to: 25,
3813 expected_outcome: PruneShardOutcome::Updated,
3814 expected_shards: &[(MAX, &[30, 40])],
3815 },
3816 AccountPruneCase {
3817 name: "equiv_multi_shard_partial_delete",
3818 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3819 prune_to: 35,
3820 expected_outcome: PruneShardOutcome::Deleted,
3821 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3822 },
3823 ];
3824
3825 let address = Address::from([0x42; 20]);
3826
3827 for case in CASES {
3828 let temp_dir = TempDir::new().unwrap();
3829 let provider =
3830 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3831
3832 let mut batch = provider.batch();
3834 for (highest, blocks) in case.initial_shards {
3835 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3836 batch
3837 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3838 .unwrap();
3839 }
3840 batch.commit().unwrap();
3841
3842 let mut batch = provider.batch();
3844 let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3845 batch.commit().unwrap();
3846
3847 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3849
3850 let shards = provider.account_history_shards(address).unwrap();
3852 assert_eq!(
3853 shards.len(),
3854 case.expected_shards.len(),
3855 "case '{}': wrong shard count",
3856 case.name
3857 );
3858 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3859 shards.iter().zip(case.expected_shards.iter()).enumerate()
3860 {
3861 assert_eq!(
3862 key.highest_block_number, *exp_key,
3863 "case '{}': shard {} wrong key",
3864 case.name, i
3865 );
3866 assert_eq!(
3867 blocks.iter().collect::<Vec<_>>(),
3868 *exp_blocks,
3869 "case '{}': shard {} wrong blocks",
3870 case.name,
3871 i
3872 );
3873 }
3874 }
3875 }
3876
3877 #[test]
3878 fn test_prune_storage_history_cases() {
3879 const MAX: u64 = u64::MAX;
3880 const CASES: &[StoragePruneCase] = &[
3881 StoragePruneCase {
3882 name: "single_shard_truncate",
3883 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3884 prune_to: 25,
3885 expected_outcome: PruneShardOutcome::Updated,
3886 expected_shards: &[(MAX, &[30, 40])],
3887 },
3888 StoragePruneCase {
3889 name: "single_shard_delete_all",
3890 initial_shards: &[(MAX, &[10, 20])],
3891 prune_to: 20,
3892 expected_outcome: PruneShardOutcome::Deleted,
3893 expected_shards: &[],
3894 },
3895 StoragePruneCase {
3896 name: "noop",
3897 initial_shards: &[(MAX, &[10, 20])],
3898 prune_to: 5,
3899 expected_outcome: PruneShardOutcome::Unchanged,
3900 expected_shards: &[(MAX, &[10, 20])],
3901 },
3902 StoragePruneCase {
3903 name: "no_shards",
3904 initial_shards: &[],
3905 prune_to: 100,
3906 expected_outcome: PruneShardOutcome::Unchanged,
3907 expected_shards: &[],
3908 },
3909 StoragePruneCase {
3910 name: "mid_shard_preserves_key",
3911 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3912 prune_to: 25,
3913 expected_outcome: PruneShardOutcome::Updated,
3914 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3915 },
3916 StoragePruneCase {
3918 name: "equiv_sentinel_promotion",
3919 initial_shards: &[(100, &[50, 75, 100])],
3920 prune_to: 60,
3921 expected_outcome: PruneShardOutcome::Updated,
3922 expected_shards: &[(MAX, &[75, 100])],
3923 },
3924 StoragePruneCase {
3925 name: "equiv_delete_early_shards_keep_sentinel",
3926 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3927 prune_to: 55,
3928 expected_outcome: PruneShardOutcome::Deleted,
3929 expected_shards: &[(MAX, &[60, 70])],
3930 },
3931 StoragePruneCase {
3932 name: "equiv_sentinel_becomes_empty_with_prev",
3933 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3934 prune_to: 40,
3935 expected_outcome: PruneShardOutcome::Deleted,
3936 expected_shards: &[(MAX, &[50])],
3937 },
3938 StoragePruneCase {
3939 name: "equiv_all_shards_become_empty",
3940 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3941 prune_to: 51,
3942 expected_outcome: PruneShardOutcome::Deleted,
3943 expected_shards: &[],
3944 },
3945 StoragePruneCase {
3946 name: "equiv_filter_within_shard",
3947 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3948 prune_to: 25,
3949 expected_outcome: PruneShardOutcome::Updated,
3950 expected_shards: &[(MAX, &[30, 40])],
3951 },
3952 StoragePruneCase {
3953 name: "equiv_multi_shard_partial_delete",
3954 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3955 prune_to: 35,
3956 expected_outcome: PruneShardOutcome::Deleted,
3957 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3958 },
3959 ];
3960
3961 let address = Address::from([0x42; 20]);
3962 let storage_key = B256::from([0x01; 32]);
3963
3964 for case in CASES {
3965 let temp_dir = TempDir::new().unwrap();
3966 let provider =
3967 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3968
3969 let mut batch = provider.batch();
3971 for (highest, blocks) in case.initial_shards {
3972 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3973 let key = if *highest == MAX {
3974 StorageShardedKey::last(address, storage_key)
3975 } else {
3976 StorageShardedKey::new(address, storage_key, *highest)
3977 };
3978 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3979 }
3980 batch.commit().unwrap();
3981
3982 let mut batch = provider.batch();
3984 let outcome =
3985 batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3986 batch.commit().unwrap();
3987
3988 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3990
3991 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3993 assert_eq!(
3994 shards.len(),
3995 case.expected_shards.len(),
3996 "case '{}': wrong shard count",
3997 case.name
3998 );
3999 for (i, ((key, blocks), (exp_key, exp_blocks))) in
4000 shards.iter().zip(case.expected_shards.iter()).enumerate()
4001 {
4002 assert_eq!(
4003 key.sharded_key.highest_block_number, *exp_key,
4004 "case '{}': shard {} wrong key",
4005 case.name, i
4006 );
4007 assert_eq!(
4008 blocks.iter().collect::<Vec<_>>(),
4009 *exp_blocks,
4010 "case '{}': shard {} wrong blocks",
4011 case.name,
4012 i
4013 );
4014 }
4015 }
4016 }
4017
4018 #[test]
4019 fn test_prune_storage_history_does_not_affect_other_slots() {
4020 let temp_dir = TempDir::new().unwrap();
4021 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4022
4023 let address = Address::from([0x42; 20]);
4024 let slot1 = B256::from([0x01; 32]);
4025 let slot2 = B256::from([0x02; 32]);
4026
4027 let mut batch = provider.batch();
4029 batch
4030 .put::<tables::StoragesHistory>(
4031 StorageShardedKey::last(address, slot1),
4032 &BlockNumberList::new_pre_sorted([10u64, 20]),
4033 )
4034 .unwrap();
4035 batch
4036 .put::<tables::StoragesHistory>(
4037 StorageShardedKey::last(address, slot2),
4038 &BlockNumberList::new_pre_sorted([30u64, 40]),
4039 )
4040 .unwrap();
4041 batch.commit().unwrap();
4042
4043 let mut batch = provider.batch();
4045 let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
4046 batch.commit().unwrap();
4047
4048 assert_eq!(outcome, PruneShardOutcome::Deleted);
4049
4050 let shards1 = provider.storage_history_shards(address, slot1).unwrap();
4052 assert!(shards1.is_empty());
4053
4054 let shards2 = provider.storage_history_shards(address, slot2).unwrap();
4056 assert_eq!(shards2.len(), 1);
4057 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
4058 }
4059
4060 #[test]
4061 fn test_prune_invariants() {
4062 let address = Address::from([0x42; 20]);
4064 let storage_key = B256::from([0x01; 32]);
4065
4066 #[expect(clippy::type_complexity)]
4068 let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
4069 (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
4071 (&[(100, &[50, 100])], 60),
4073 ];
4074
4075 for (initial_shards, prune_to) in invariant_cases {
4076 {
4078 let temp_dir = TempDir::new().unwrap();
4079 let provider =
4080 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4081
4082 let mut batch = provider.batch();
4083 for (highest, blocks) in *initial_shards {
4084 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4085 batch
4086 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
4087 .unwrap();
4088 }
4089 batch.commit().unwrap();
4090
4091 let mut batch = provider.batch();
4092 batch.prune_account_history_to(address, *prune_to).unwrap();
4093 batch.commit().unwrap();
4094
4095 let shards = provider.account_history_shards(address).unwrap();
4096
4097 for (key, blocks) in &shards {
4099 assert!(
4100 !blocks.is_empty(),
4101 "Account: empty shard at key {}",
4102 key.highest_block_number
4103 );
4104 }
4105
4106 if !shards.is_empty() {
4108 let last = shards.last().unwrap();
4109 assert_eq!(
4110 last.0.highest_block_number,
4111 u64::MAX,
4112 "Account: last shard must be sentinel"
4113 );
4114 }
4115 }
4116
4117 {
4119 let temp_dir = TempDir::new().unwrap();
4120 let provider =
4121 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4122
4123 let mut batch = provider.batch();
4124 for (highest, blocks) in *initial_shards {
4125 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4126 let key = if *highest == u64::MAX {
4127 StorageShardedKey::last(address, storage_key)
4128 } else {
4129 StorageShardedKey::new(address, storage_key, *highest)
4130 };
4131 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4132 }
4133 batch.commit().unwrap();
4134
4135 let mut batch = provider.batch();
4136 batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
4137 batch.commit().unwrap();
4138
4139 let shards = provider.storage_history_shards(address, storage_key).unwrap();
4140
4141 for (key, blocks) in &shards {
4143 assert!(
4144 !blocks.is_empty(),
4145 "Storage: empty shard at key {}",
4146 key.sharded_key.highest_block_number
4147 );
4148 }
4149
4150 if !shards.is_empty() {
4152 let last = shards.last().unwrap();
4153 assert_eq!(
4154 last.0.sharded_key.highest_block_number,
4155 u64::MAX,
4156 "Storage: last shard must be sentinel"
4157 );
4158 }
4159 }
4160 }
4161 }
4162
4163 #[test]
4164 fn test_prune_account_history_batch_multiple_sorted_targets() {
4165 let temp_dir = TempDir::new().unwrap();
4166 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4167
4168 let addr1 = Address::from([0x01; 20]);
4169 let addr2 = Address::from([0x02; 20]);
4170 let addr3 = Address::from([0x03; 20]);
4171
4172 let mut batch = provider.batch();
4174 batch
4175 .put::<tables::AccountsHistory>(
4176 ShardedKey::new(addr1, u64::MAX),
4177 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4178 )
4179 .unwrap();
4180 batch
4181 .put::<tables::AccountsHistory>(
4182 ShardedKey::new(addr2, u64::MAX),
4183 &BlockNumberList::new_pre_sorted([5, 10, 15]),
4184 )
4185 .unwrap();
4186 batch
4187 .put::<tables::AccountsHistory>(
4188 ShardedKey::new(addr3, u64::MAX),
4189 &BlockNumberList::new_pre_sorted([100, 200]),
4190 )
4191 .unwrap();
4192 batch.commit().unwrap();
4193
4194 let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4196 targets.sort_by_key(|(addr, _)| *addr);
4197
4198 let mut batch = provider.batch();
4199 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4200 batch.commit().unwrap();
4201
4202 assert_eq!(outcomes.updated, 2);
4206 assert_eq!(outcomes.unchanged, 1);
4207
4208 let shards1 = provider.account_history_shards(addr1).unwrap();
4209 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4210
4211 let shards2 = provider.account_history_shards(addr2).unwrap();
4212 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4213
4214 let shards3 = provider.account_history_shards(addr3).unwrap();
4215 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4216 }
4217
4218 #[test]
4219 fn test_prune_account_history_batch_target_with_no_shards() {
4220 let temp_dir = TempDir::new().unwrap();
4221 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4222
4223 let addr1 = Address::from([0x01; 20]);
4224 let addr2 = Address::from([0x02; 20]); let addr3 = Address::from([0x03; 20]);
4226
4227 let mut batch = provider.batch();
4229 batch
4230 .put::<tables::AccountsHistory>(
4231 ShardedKey::new(addr1, u64::MAX),
4232 &BlockNumberList::new_pre_sorted([10, 20]),
4233 )
4234 .unwrap();
4235 batch
4236 .put::<tables::AccountsHistory>(
4237 ShardedKey::new(addr3, u64::MAX),
4238 &BlockNumberList::new_pre_sorted([30, 40]),
4239 )
4240 .unwrap();
4241 batch.commit().unwrap();
4242
4243 let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4245 targets.sort_by_key(|(addr, _)| *addr);
4246
4247 let mut batch = provider.batch();
4248 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4249 batch.commit().unwrap();
4250
4251 assert_eq!(outcomes.updated, 2);
4255 assert_eq!(outcomes.unchanged, 1);
4256
4257 let shards1 = provider.account_history_shards(addr1).unwrap();
4258 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4259
4260 let shards3 = provider.account_history_shards(addr3).unwrap();
4261 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4262 }
4263
4264 #[test]
4265 fn test_prune_storage_history_batch_multiple_sorted_targets() {
4266 let temp_dir = TempDir::new().unwrap();
4267 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4268
4269 let addr = Address::from([0x42; 20]);
4270 let slot1 = B256::from([0x01; 32]);
4271 let slot2 = B256::from([0x02; 32]);
4272
4273 let mut batch = provider.batch();
4275 batch
4276 .put::<tables::StoragesHistory>(
4277 StorageShardedKey::new(addr, slot1, u64::MAX),
4278 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4279 )
4280 .unwrap();
4281 batch
4282 .put::<tables::StoragesHistory>(
4283 StorageShardedKey::new(addr, slot2, u64::MAX),
4284 &BlockNumberList::new_pre_sorted([5, 15, 25]),
4285 )
4286 .unwrap();
4287 batch.commit().unwrap();
4288
4289 let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4291 targets.sort_by_key(|((a, s), _)| (*a, *s));
4292
4293 let mut batch = provider.batch();
4294 let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4295 batch.commit().unwrap();
4296
4297 assert_eq!(outcomes.updated, 2);
4298
4299 let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4300 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4301
4302 let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4303 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4304 }
4305}