1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5 map::{AddressMap, HashMap},
6 Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13 database_metrics::DatabaseMetrics,
14 models::{
15 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16 StorageSettings,
17 },
18 table::{Compress, Decode, Decompress, Encode, Table},
19 tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25 provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30 OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
31 WriteBatchWithTransaction, WriteOptions, DB,
32};
33use std::{
34 collections::BTreeMap,
35 fmt,
36 path::{Path, PathBuf},
37 sync::Arc,
38};
39use tracing::instrument;
40
41pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
43
44type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
46
47#[derive(Debug, Clone)]
49pub struct RocksDBTableStats {
50 pub sst_size_bytes: u64,
52 pub memtable_size_bytes: u64,
54 pub name: String,
56 pub estimated_num_keys: u64,
58 pub estimated_size_bytes: u64,
60 pub pending_compaction_bytes: u64,
62}
63
64#[derive(Debug, Clone)]
68pub struct RocksDBStats {
69 pub tables: Vec<RocksDBTableStats>,
71 pub wal_size_bytes: u64,
75}
76
77#[derive(Clone)]
79pub(crate) struct RocksDBWriteCtx {
80 pub first_block_number: BlockNumber,
82 pub prune_tx_lookup: Option<PruneMode>,
84 pub storage_settings: StorageSettings,
86 pub pending_batches: PendingRocksDBBatches,
88}
89
90impl fmt::Debug for RocksDBWriteCtx {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("RocksDBWriteCtx")
93 .field("first_block_number", &self.first_block_number)
94 .field("prune_tx_lookup", &self.prune_tx_lookup)
95 .field("storage_settings", &self.storage_settings)
96 .field("pending_batches", &"<pending batches>")
97 .finish()
98 }
99}
100
101const DEFAULT_CACHE_SIZE: usize = 128 << 20;
103
104const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
106
107const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
109
110const DEFAULT_MAX_OPEN_FILES: i32 = 512;
118
119const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
121
122const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
128
129const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
133
134const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
140
141pub struct RocksDBBuilder {
143 path: PathBuf,
144 column_families: Vec<String>,
145 enable_metrics: bool,
146 enable_statistics: bool,
147 log_level: rocksdb::LogLevel,
148 block_cache: Cache,
149 read_only: bool,
150}
151
152impl fmt::Debug for RocksDBBuilder {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 f.debug_struct("RocksDBBuilder")
155 .field("path", &self.path)
156 .field("column_families", &self.column_families)
157 .field("enable_metrics", &self.enable_metrics)
158 .finish()
159 }
160}
161
162impl RocksDBBuilder {
163 pub fn new(path: impl AsRef<Path>) -> Self {
165 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
166 Self {
167 path: path.as_ref().to_path_buf(),
168 column_families: Vec::new(),
169 enable_metrics: false,
170 enable_statistics: false,
171 log_level: rocksdb::LogLevel::Info,
172 block_cache: cache,
173 read_only: false,
174 }
175 }
176
177 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
179 let mut table_options = BlockBasedOptions::default();
180 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
181 table_options.set_cache_index_and_filter_blocks(true);
182 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
183 table_options.set_block_cache(cache);
185 table_options
186 }
187
188 fn default_options(
190 log_level: rocksdb::LogLevel,
191 cache: &Cache,
192 enable_statistics: bool,
193 ) -> Options {
194 let table_options = Self::default_table_options(cache);
196
197 let mut options = Options::default();
198 options.set_block_based_table_factory(&table_options);
199 options.create_if_missing(true);
200 options.create_missing_column_families(true);
201 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
202 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
203
204 options.set_bottommost_compression_type(DBCompressionType::Zstd);
205 options.set_bottommost_zstd_max_train_bytes(0, true);
206 options.set_compression_type(DBCompressionType::Lz4);
207 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
208
209 options.set_log_level(log_level);
210
211 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
212
213 options.set_wal_ttl_seconds(0);
216 options.set_wal_size_limit_mb(0);
217
218 if enable_statistics {
220 options.enable_statistics();
221 }
222
223 options
224 }
225
226 fn default_column_family_options(cache: &Cache) -> Options {
228 let table_options = Self::default_table_options(cache);
230
231 let mut cf_options = Options::default();
232 cf_options.set_block_based_table_factory(&table_options);
233 cf_options.set_level_compaction_dynamic_level_bytes(true);
234 cf_options.set_compression_type(DBCompressionType::Lz4);
236 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
237 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
239 cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
240
241 cf_options
242 }
243
244 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
251 let mut table_options = BlockBasedOptions::default();
252 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
253 table_options.set_cache_index_and_filter_blocks(true);
254 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
255 table_options.set_block_cache(cache);
256 let mut cf_options = Options::default();
260 cf_options.set_block_based_table_factory(&table_options);
261 cf_options.set_level_compaction_dynamic_level_bytes(true);
262 cf_options.set_compression_type(DBCompressionType::None);
265 cf_options.set_bottommost_compression_type(DBCompressionType::None);
266
267 cf_options
268 }
269
270 pub fn with_table<T: Table>(mut self) -> Self {
272 self.column_families.push(T::NAME.to_string());
273 self
274 }
275
276 pub fn with_default_tables(self) -> Self {
283 self.with_table::<tables::TransactionHashNumbers>()
284 .with_table::<tables::AccountsHistory>()
285 .with_table::<tables::StoragesHistory>()
286 }
287
288 pub const fn with_metrics(mut self) -> Self {
290 self.enable_metrics = true;
291 self
292 }
293
294 pub const fn with_statistics(mut self) -> Self {
296 self.enable_statistics = true;
297 self
298 }
299
300 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
302 if let Some(level) = log_level {
303 self.log_level = convert_log_level(level);
304 }
305 self
306 }
307
308 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
310 self.block_cache = Cache::new_lru_cache(capacity_bytes);
311 self
312 }
313
314 pub const fn with_read_only(mut self, read_only: bool) -> Self {
318 self.read_only = read_only;
319 self
320 }
321
322 pub fn build(self) -> ProviderResult<RocksDBProvider> {
324 let options =
325 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
326
327 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
328 .column_families
329 .iter()
330 .map(|name| {
331 let cf_options = if name == tables::TransactionHashNumbers::NAME {
332 Self::tx_hash_numbers_column_family_options(&self.block_cache)
333 } else {
334 Self::default_column_family_options(&self.block_cache)
335 };
336 ColumnFamilyDescriptor::new(name.clone(), cf_options)
337 })
338 .collect();
339
340 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
341
342 if self.read_only {
343 let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
344 .map_err(|e| {
345 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
346 message: e.to_string().into(),
347 code: -1,
348 }))
349 })?;
350 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
351 } else {
352 let db =
357 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
358 .map_err(|e| {
359 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
360 message: e.to_string().into(),
361 code: -1,
362 }))
363 })?;
364 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
365 }
366 }
367}
368
369macro_rules! compress_to_buf_or_ref {
372 ($buf:expr, $value:expr) => {
373 if let Some(value) = $value.uncompressable_ref() {
374 Some(value)
375 } else {
376 $buf.clear();
377 $value.compress_to_buf(&mut $buf);
378 None
379 }
380 };
381}
382
383#[derive(Debug)]
385pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
386
387enum RocksDBProviderInner {
389 ReadWrite {
391 db: OptimisticTransactionDB,
393 metrics: Option<RocksDBMetrics>,
395 },
396 ReadOnly {
399 db: DB,
401 metrics: Option<RocksDBMetrics>,
403 },
404}
405
406impl RocksDBProviderInner {
407 const fn metrics(&self) -> Option<&RocksDBMetrics> {
409 match self {
410 Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
411 }
412 }
413
414 fn db_rw(&self) -> &OptimisticTransactionDB {
416 match self {
417 Self::ReadWrite { db, .. } => db,
418 Self::ReadOnly { .. } => {
419 panic!("Cannot perform write operation on read-only RocksDB provider")
420 }
421 }
422 }
423
424 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
426 let cf = match self {
427 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
428 Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
429 };
430 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
431 }
432
433 fn get_cf(
435 &self,
436 cf: &rocksdb::ColumnFamily,
437 key: impl AsRef<[u8]>,
438 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
439 match self {
440 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
441 Self::ReadOnly { db, .. } => db.get_cf(cf, key),
442 }
443 }
444
445 fn put_cf(
447 &self,
448 cf: &rocksdb::ColumnFamily,
449 key: impl AsRef<[u8]>,
450 value: impl AsRef<[u8]>,
451 ) -> Result<(), rocksdb::Error> {
452 self.db_rw().put_cf(cf, key, value)
453 }
454
455 fn delete_cf(
457 &self,
458 cf: &rocksdb::ColumnFamily,
459 key: impl AsRef<[u8]>,
460 ) -> Result<(), rocksdb::Error> {
461 self.db_rw().delete_cf(cf, key)
462 }
463
464 fn delete_range_cf<K: AsRef<[u8]>>(
466 &self,
467 cf: &rocksdb::ColumnFamily,
468 from: K,
469 to: K,
470 ) -> Result<(), rocksdb::Error> {
471 self.db_rw().delete_range_cf(cf, from, to)
472 }
473
474 fn iterator_cf(
476 &self,
477 cf: &rocksdb::ColumnFamily,
478 mode: IteratorMode<'_>,
479 ) -> RocksDBIterEnum<'_> {
480 match self {
481 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
482 Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
483 }
484 }
485
486 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
491 match self {
492 Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
493 Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
494 }
495 }
496
497 fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
499 match self {
500 Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
501 Self::ReadOnly { db, .. } => RocksReadSnapshotInner::ReadOnly(db.snapshot()),
502 }
503 }
504
505 fn path(&self) -> &Path {
507 match self {
508 Self::ReadWrite { db, .. } => db.path(),
509 Self::ReadOnly { db, .. } => db.path(),
510 }
511 }
512
513 fn wal_size_bytes(&self) -> u64 {
517 let path = self.path();
518
519 match std::fs::read_dir(path) {
520 Ok(entries) => entries
521 .filter_map(|e| e.ok())
522 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
523 .filter_map(|e| e.metadata().ok())
524 .map(|m| m.len())
525 .sum(),
526 Err(_) => 0,
527 }
528 }
529
530 fn table_stats(&self) -> Vec<RocksDBTableStats> {
532 let mut stats = Vec::new();
533
534 macro_rules! collect_stats {
535 ($db:expr) => {
536 for cf_name in ROCKSDB_TABLES {
537 if let Some(cf) = $db.cf_handle(cf_name) {
538 let estimated_num_keys = $db
539 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
540 .ok()
541 .flatten()
542 .unwrap_or(0);
543
544 let sst_size = $db
546 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
547 .ok()
548 .flatten()
549 .unwrap_or(0);
550
551 let memtable_size = $db
552 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
553 .ok()
554 .flatten()
555 .unwrap_or(0);
556
557 let estimated_size_bytes = sst_size + memtable_size;
558
559 let pending_compaction_bytes = $db
560 .property_int_value_cf(
561 cf,
562 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
563 )
564 .ok()
565 .flatten()
566 .unwrap_or(0);
567
568 stats.push(RocksDBTableStats {
569 sst_size_bytes: sst_size,
570 memtable_size_bytes: memtable_size,
571 name: cf_name.to_string(),
572 estimated_num_keys,
573 estimated_size_bytes,
574 pending_compaction_bytes,
575 });
576 }
577 }
578 };
579 }
580
581 match self {
582 Self::ReadWrite { db, .. } => collect_stats!(db),
583 Self::ReadOnly { db, .. } => collect_stats!(db),
584 }
585
586 stats
587 }
588
589 fn db_stats(&self) -> RocksDBStats {
591 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
592 }
593}
594
595impl fmt::Debug for RocksDBProviderInner {
596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597 match self {
598 Self::ReadWrite { metrics, .. } => f
599 .debug_struct("RocksDBProviderInner::ReadWrite")
600 .field("db", &"<OptimisticTransactionDB>")
601 .field("metrics", metrics)
602 .finish(),
603 Self::ReadOnly { metrics, .. } => f
604 .debug_struct("RocksDBProviderInner::ReadOnly")
605 .field("db", &"<DB (read-only)>")
606 .field("metrics", metrics)
607 .finish(),
608 }
609 }
610}
611
612impl Drop for RocksDBProviderInner {
613 fn drop(&mut self) {
614 match self {
615 Self::ReadWrite { db, .. } => {
616 if let Err(e) = db.flush_wal(true) {
619 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
620 }
621 for cf_name in ROCKSDB_TABLES {
622 if let Some(cf) = db.cf_handle(cf_name) &&
623 let Err(e) = db.flush_cf(&cf)
624 {
625 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
626 }
627 }
628 db.cancel_all_background_work(true);
629 }
630 Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
631 }
632 }
633}
634
635impl Clone for RocksDBProvider {
636 fn clone(&self) -> Self {
637 Self(self.0.clone())
638 }
639}
640
641impl DatabaseMetrics for RocksDBProvider {
642 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
643 let mut metrics = Vec::new();
644
645 for stat in self.table_stats() {
646 metrics.push((
647 "rocksdb.table_size",
648 stat.estimated_size_bytes as f64,
649 vec![Label::new("table", stat.name.clone())],
650 ));
651 metrics.push((
652 "rocksdb.table_entries",
653 stat.estimated_num_keys as f64,
654 vec![Label::new("table", stat.name.clone())],
655 ));
656 metrics.push((
657 "rocksdb.pending_compaction_bytes",
658 stat.pending_compaction_bytes as f64,
659 vec![Label::new("table", stat.name.clone())],
660 ));
661 metrics.push((
662 "rocksdb.sst_size",
663 stat.sst_size_bytes as f64,
664 vec![Label::new("table", stat.name.clone())],
665 ));
666 metrics.push((
667 "rocksdb.memtable_size",
668 stat.memtable_size_bytes as f64,
669 vec![Label::new("table", stat.name)],
670 ));
671 }
672
673 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
675
676 metrics
677 }
678}
679
680impl RocksDBProvider {
681 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
683 RocksDBBuilder::new(path).build()
684 }
685
686 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
688 RocksDBBuilder::new(path)
689 }
690
691 pub fn exists(path: impl AsRef<Path>) -> bool {
696 path.as_ref().join("CURRENT").exists()
697 }
698
699 pub fn is_read_only(&self) -> bool {
701 matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
702 }
703
704 pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
708 RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
709 }
710
711 pub fn tx(&self) -> RocksTx<'_> {
719 let write_options = WriteOptions::default();
720 let txn_options = OptimisticTransactionOptions::default();
721 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
722 RocksTx { inner, provider: self }
723 }
724
725 pub fn batch(&self) -> RocksDBBatch<'_> {
733 RocksDBBatch {
734 provider: self,
735 inner: WriteBatchWithTransaction::<true>::default(),
736 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
737 auto_commit_threshold: None,
738 }
739 }
740
741 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
747 RocksDBBatch {
748 provider: self,
749 inner: WriteBatchWithTransaction::<true>::default(),
750 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
751 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
752 }
753 }
754
755 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
757 self.0.cf_handle::<T>()
758 }
759
760 fn execute_with_operation_metric<R>(
762 &self,
763 operation: RocksDBOperation,
764 table: &'static str,
765 f: impl FnOnce(&Self) -> R,
766 ) -> R {
767 let start = self.0.metrics().map(|_| Instant::now());
768 let res = f(self);
769
770 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
771 metrics.record_operation(operation, table, start.elapsed());
772 }
773
774 res
775 }
776
777 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
779 self.get_encoded::<T>(&key.encode())
780 }
781
782 pub fn get_encoded<T: Table>(
784 &self,
785 key: &<T::Key as Encode>::Encoded,
786 ) -> ProviderResult<Option<T::Value>> {
787 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
788 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
789 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
790 message: e.to_string().into(),
791 code: -1,
792 }))
793 })?;
794
795 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
796 })
797 }
798
799 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
804 let encoded_key = key.encode();
805 self.put_encoded::<T>(&encoded_key, value)
806 }
807
808 pub fn put_encoded<T: Table>(
813 &self,
814 key: &<T::Key as Encode>::Encoded,
815 value: &T::Value,
816 ) -> ProviderResult<()> {
817 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
818 let mut buf = Vec::new();
822 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
823
824 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
825 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
826 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
827 operation: DatabaseWriteOperation::PutUpsert,
828 table_name: T::NAME,
829 key: key.as_ref().to_vec(),
830 })))
831 })
832 })
833 }
834
835 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
840 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
841 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
842 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
843 message: e.to_string().into(),
844 code: -1,
845 }))
846 })
847 })
848 }
849
850 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
856 let cf = self.get_cf_handle::<T>()?;
857
858 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
859 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
860 message: e.to_string().into(),
861 code: -1,
862 }))
863 })?;
864
865 Ok(())
866 }
867
868 fn get_boundary<T: Table>(
870 &self,
871 mode: IteratorMode<'_>,
872 ) -> ProviderResult<Option<(T::Key, T::Value)>> {
873 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
874 let cf = this.get_cf_handle::<T>()?;
875 let mut iter = this.0.iterator_cf(cf, mode);
876
877 match iter.next() {
878 Some(Ok((key_bytes, value_bytes))) => {
879 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
880 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
881 let value = T::Value::decompress(&value_bytes)
882 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
883 Ok(Some((key, value)))
884 }
885 Some(Err(e)) => {
886 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
887 message: e.to_string().into(),
888 code: -1,
889 })))
890 }
891 None => Ok(None),
892 }
893 })
894 }
895
896 #[inline]
898 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
899 self.get_boundary::<T>(IteratorMode::Start)
900 }
901
902 #[inline]
904 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
905 self.get_boundary::<T>(IteratorMode::End)
906 }
907
908 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
912 let cf = self.get_cf_handle::<T>()?;
913 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
914 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
915 }
916
917 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
921 self.0.table_stats()
922 }
923
924 pub fn wal_size_bytes(&self) -> u64 {
930 self.0.wal_size_bytes()
931 }
932
933 pub fn db_stats(&self) -> RocksDBStats {
937 self.0.db_stats()
938 }
939
940 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
951 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
952 let db = self.0.db_rw();
953
954 for cf_name in tables {
955 if let Some(cf) = db.cf_handle(cf_name) {
956 db.flush_cf(&cf).map_err(|e| {
957 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
958 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
959 operation: DatabaseWriteOperation::Flush,
960 table_name: cf_name,
961 key: Vec::new(),
962 })))
963 })?;
964 }
965 }
966
967 db.flush_wal(true).map_err(|e| {
968 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
969 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
970 operation: DatabaseWriteOperation::Flush,
971 table_name: "WAL",
972 key: Vec::new(),
973 })))
974 })?;
975
976 Ok(())
977 }
978
979 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
991 pub fn flush_and_compact(&self) -> ProviderResult<()> {
992 self.flush(ROCKSDB_TABLES)?;
993
994 let db = self.0.db_rw();
995
996 for cf_name in ROCKSDB_TABLES {
997 if let Some(cf) = db.cf_handle(cf_name) {
998 db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
999 }
1000 }
1001
1002 Ok(())
1003 }
1004
1005 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1009 let cf = self.get_cf_handle::<T>()?;
1010 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1011 Ok(RocksDBRawIter { inner: iter })
1012 }
1013
1014 pub fn account_history_shards(
1019 &self,
1020 address: Address,
1021 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1022 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1024
1025 let start_key = ShardedKey::new(address, 0u64);
1028 let start_bytes = start_key.encode();
1029
1030 let iter = self
1032 .0
1033 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1034
1035 let mut result = Vec::new();
1036 for item in iter {
1037 match item {
1038 Ok((key_bytes, value_bytes)) => {
1039 let key = ShardedKey::<Address>::decode(&key_bytes)
1041 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1042
1043 if key.key != address {
1045 break;
1046 }
1047
1048 let value = BlockNumberList::decompress(&value_bytes)
1050 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1051
1052 result.push((key, value));
1053 }
1054 Err(e) => {
1055 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1056 message: e.to_string().into(),
1057 code: -1,
1058 })));
1059 }
1060 }
1061 }
1062
1063 Ok(result)
1064 }
1065
1066 pub fn storage_history_shards(
1071 &self,
1072 address: Address,
1073 storage_key: B256,
1074 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1075 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1076
1077 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1078 let start_bytes = start_key.encode();
1079
1080 let iter = self
1081 .0
1082 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1083
1084 let mut result = Vec::new();
1085 for item in iter {
1086 match item {
1087 Ok((key_bytes, value_bytes)) => {
1088 let key = StorageShardedKey::decode(&key_bytes)
1089 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1090
1091 if key.address != address || key.sharded_key.key != storage_key {
1092 break;
1093 }
1094
1095 let value = BlockNumberList::decompress(&value_bytes)
1096 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1097
1098 result.push((key, value));
1099 }
1100 Err(e) => {
1101 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1102 message: e.to_string().into(),
1103 code: -1,
1104 })));
1105 }
1106 }
1107 }
1108
1109 Ok(result)
1110 }
1111
1112 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1120 pub fn unwind_account_history_indices(
1121 &self,
1122 last_indices: &[(Address, BlockNumber)],
1123 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1124 let mut address_min_block: AddressMap<BlockNumber> =
1125 AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1126 for &(address, block_number) in last_indices {
1127 address_min_block
1128 .entry(address)
1129 .and_modify(|min| *min = (*min).min(block_number))
1130 .or_insert(block_number);
1131 }
1132
1133 let mut batch = self.batch();
1134 for (address, min_block) in address_min_block {
1135 match min_block.checked_sub(1) {
1136 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1137 None => batch.clear_account_history(address)?,
1138 }
1139 }
1140
1141 Ok(batch.into_inner())
1142 }
1143
1144 pub fn unwind_storage_history_indices(
1152 &self,
1153 storage_changesets: &[(Address, B256, BlockNumber)],
1154 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1155 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1156 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1157 for &(address, storage_key, block_number) in storage_changesets {
1158 key_min_block
1159 .entry((address, storage_key))
1160 .and_modify(|min| *min = (*min).min(block_number))
1161 .or_insert(block_number);
1162 }
1163
1164 let mut batch = self.batch();
1165 for ((address, storage_key), min_block) in key_min_block {
1166 match min_block.checked_sub(1) {
1167 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1168 None => batch.clear_storage_history(address, storage_key)?,
1169 }
1170 }
1171
1172 Ok(batch.into_inner())
1173 }
1174
1175 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1177 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1178 where
1179 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1180 {
1181 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1182 let mut batch_handle = this.batch();
1183 f(&mut batch_handle)?;
1184 batch_handle.commit()
1185 })
1186 }
1187
1188 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1196 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1197 self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1198 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1199 message: e.to_string().into(),
1200 code: -1,
1201 }))
1202 })
1203 }
1204
1205 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1211 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1212 &self,
1213 blocks: &[ExecutedBlock<N>],
1214 tx_nums: &[TxNumber],
1215 ctx: RocksDBWriteCtx,
1216 runtime: &reth_tasks::Runtime,
1217 ) -> ProviderResult<()> {
1218 if !ctx.storage_settings.storage_v2 {
1219 return Ok(());
1220 }
1221
1222 let mut r_tx_hash = None;
1223 let mut r_account_history = None;
1224 let mut r_storage_history = None;
1225
1226 let write_tx_hash =
1227 ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1228 let write_account_history = ctx.storage_settings.storage_v2;
1229 let write_storage_history = ctx.storage_settings.storage_v2;
1230
1231 let span = tracing::Span::current();
1234 runtime.storage_pool().in_place_scope(|s| {
1235 if write_tx_hash {
1236 s.spawn(|_| {
1237 let _guard = span.enter();
1238 r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1239 });
1240 }
1241
1242 if write_account_history {
1243 s.spawn(|_| {
1244 let _guard = span.enter();
1245 r_account_history = Some(self.write_account_history(blocks, &ctx));
1246 });
1247 }
1248
1249 if write_storage_history {
1250 s.spawn(|_| {
1251 let _guard = span.enter();
1252 r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1253 });
1254 }
1255 });
1256
1257 if write_tx_hash {
1258 r_tx_hash.ok_or_else(|| {
1259 ProviderError::Database(DatabaseError::Other(
1260 "rocksdb tx-hash write thread panicked".into(),
1261 ))
1262 })??;
1263 }
1264 if write_account_history {
1265 r_account_history.ok_or_else(|| {
1266 ProviderError::Database(DatabaseError::Other(
1267 "rocksdb account-history write thread panicked".into(),
1268 ))
1269 })??;
1270 }
1271 if write_storage_history {
1272 r_storage_history.ok_or_else(|| {
1273 ProviderError::Database(DatabaseError::Other(
1274 "rocksdb storage-history write thread panicked".into(),
1275 ))
1276 })??;
1277 }
1278
1279 Ok(())
1280 }
1281
1282 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1284 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1285 &self,
1286 blocks: &[ExecutedBlock<N>],
1287 tx_nums: &[TxNumber],
1288 ctx: &RocksDBWriteCtx,
1289 ) -> ProviderResult<()> {
1290 let mut batch = self.batch();
1291 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1292 let body = block.recovered_block().body();
1293 for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1294 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1295 }
1296 }
1297 ctx.pending_batches.lock().push(batch.into_inner());
1298 Ok(())
1299 }
1300
1301 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1305 fn write_account_history<N: reth_node_types::NodePrimitives>(
1306 &self,
1307 blocks: &[ExecutedBlock<N>],
1308 ctx: &RocksDBWriteCtx,
1309 ) -> ProviderResult<()> {
1310 let mut batch = self.batch();
1311 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1312
1313 for (block_idx, block) in blocks.iter().enumerate() {
1314 let block_number = ctx.first_block_number + block_idx as u64;
1315 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1316
1317 for account_block_reverts in reverts.accounts {
1320 for (address, _) in account_block_reverts {
1321 account_history.entry(address).or_default().push(block_number);
1322 }
1323 }
1324 }
1325
1326 for (address, indices) in account_history {
1328 batch.append_account_history_shard(address, indices)?;
1329 }
1330 ctx.pending_batches.lock().push(batch.into_inner());
1331 Ok(())
1332 }
1333
1334 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1338 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1339 &self,
1340 blocks: &[ExecutedBlock<N>],
1341 ctx: &RocksDBWriteCtx,
1342 ) -> ProviderResult<()> {
1343 let mut batch = self.batch();
1344 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1345
1346 for (block_idx, block) in blocks.iter().enumerate() {
1347 let block_number = ctx.first_block_number + block_idx as u64;
1348 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1349
1350 for storage_block_reverts in reverts.storage {
1353 for revert in storage_block_reverts {
1354 for (slot, _) in revert.storage_revert {
1355 let plain_key = B256::new(slot.to_be_bytes());
1356 storage_history
1357 .entry((revert.address, plain_key))
1358 .or_default()
1359 .push(block_number);
1360 }
1361 }
1362 }
1363 }
1364
1365 for ((address, slot), indices) in storage_history {
1367 batch.append_storage_history_shard(address, slot, indices)?;
1368 }
1369 ctx.pending_batches.lock().push(batch.into_inner());
1370 Ok(())
1371 }
1372}
1373
1374pub struct RocksReadSnapshot<'db> {
1382 inner: RocksReadSnapshotInner<'db>,
1383 provider: &'db RocksDBProvider,
1384}
1385
1386enum RocksReadSnapshotInner<'db> {
1388 ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1390 ReadOnly(SnapshotWithThreadMode<'db, DB>),
1392}
1393
1394impl<'db> RocksReadSnapshotInner<'db> {
1395 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1397 match self {
1398 Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1399 Self::ReadOnly(snap) => RocksDBRawIterEnum::ReadOnly(snap.raw_iterator_cf(cf)),
1400 }
1401 }
1402}
1403
1404impl fmt::Debug for RocksReadSnapshot<'_> {
1405 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1406 f.debug_struct("RocksReadSnapshot")
1407 .field("provider", &self.provider)
1408 .finish_non_exhaustive()
1409 }
1410}
1411
1412impl<'db> RocksReadSnapshot<'db> {
1413 fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1415 self.provider.get_cf_handle::<T>()
1416 }
1417
1418 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1420 let encoded_key = key.encode();
1421 let cf = self.cf_handle::<T>()?;
1422 let result = match &self.inner {
1423 RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1424 RocksReadSnapshotInner::ReadOnly(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1425 }
1426 .map_err(|e| {
1427 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1428 message: e.to_string().into(),
1429 code: -1,
1430 }))
1431 })?;
1432
1433 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1434 }
1435
1436 pub fn account_history_info(
1438 &self,
1439 address: Address,
1440 block_number: BlockNumber,
1441 lowest_available_block_number: Option<BlockNumber>,
1442 ) -> ProviderResult<HistoryInfo> {
1443 let key = ShardedKey::new(address, block_number);
1444 self.history_info::<tables::AccountsHistory>(
1445 key.encode().as_ref(),
1446 block_number,
1447 lowest_available_block_number,
1448 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1449 |prev_bytes| {
1450 <ShardedKey<Address> as Decode>::decode(prev_bytes)
1451 .map(|k| k.key == address)
1452 .unwrap_or(false)
1453 },
1454 )
1455 }
1456
1457 pub fn storage_history_info(
1459 &self,
1460 address: Address,
1461 storage_key: B256,
1462 block_number: BlockNumber,
1463 lowest_available_block_number: Option<BlockNumber>,
1464 ) -> ProviderResult<HistoryInfo> {
1465 let key = StorageShardedKey::new(address, storage_key, block_number);
1466 self.history_info::<tables::StoragesHistory>(
1467 key.encode().as_ref(),
1468 block_number,
1469 lowest_available_block_number,
1470 |key_bytes| {
1471 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1472 Ok(k.address == address && k.sharded_key.key == storage_key)
1473 },
1474 |prev_bytes| {
1475 <StorageShardedKey as Decode>::decode(prev_bytes)
1476 .map(|k| k.address == address && k.sharded_key.key == storage_key)
1477 .unwrap_or(false)
1478 },
1479 )
1480 }
1481
1482 fn history_info<T>(
1484 &self,
1485 encoded_key: &[u8],
1486 block_number: BlockNumber,
1487 lowest_available_block_number: Option<BlockNumber>,
1488 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1489 prev_key_matches: impl Fn(&[u8]) -> bool,
1490 ) -> ProviderResult<HistoryInfo>
1491 where
1492 T: Table<Value = BlockNumberList>,
1493 {
1494 let is_maybe_pruned = lowest_available_block_number.is_some();
1495 let fallback = || {
1496 Ok(if is_maybe_pruned {
1497 HistoryInfo::MaybeInPlainState
1498 } else {
1499 HistoryInfo::NotYetWritten
1500 })
1501 };
1502
1503 let cf = self.cf_handle::<T>()?;
1504 let mut iter = self.inner.raw_iterator_cf(cf);
1505
1506 iter.seek(encoded_key);
1507 iter.status().map_err(|e| {
1508 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1509 message: e.to_string().into(),
1510 code: -1,
1511 }))
1512 })?;
1513
1514 if !iter.valid() {
1515 return fallback();
1516 }
1517
1518 let Some(key_bytes) = iter.key() else {
1519 return fallback();
1520 };
1521 if !key_matches(key_bytes)? {
1522 return fallback();
1523 }
1524
1525 let Some(value_bytes) = iter.value() else {
1526 return fallback();
1527 };
1528 let chunk = BlockNumberList::decompress(value_bytes)?;
1529 let (rank, found_block) = compute_history_rank(&chunk, block_number);
1530
1531 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1532 iter.prev();
1533 iter.status().map_err(|e| {
1534 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1535 message: e.to_string().into(),
1536 code: -1,
1537 }))
1538 })?;
1539 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1540 !has_prev
1541 } else {
1542 false
1543 };
1544
1545 Ok(HistoryInfo::from_lookup(
1546 found_block,
1547 is_before_first_write,
1548 lowest_available_block_number,
1549 ))
1550 }
1551}
1552
1553#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1555pub enum PruneShardOutcome {
1556 Deleted,
1558 Updated,
1560 Unchanged,
1562}
1563
1564#[derive(Debug, Default, Clone, Copy)]
1566pub struct PrunedIndices {
1567 pub deleted: usize,
1569 pub updated: usize,
1571 pub unchanged: usize,
1573}
1574
1575#[must_use = "batch must be committed"]
1585pub struct RocksDBBatch<'a> {
1586 provider: &'a RocksDBProvider,
1587 inner: WriteBatchWithTransaction<true>,
1588 buf: Vec<u8>,
1589 auto_commit_threshold: Option<usize>,
1591}
1592
1593impl fmt::Debug for RocksDBBatch<'_> {
1594 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1595 f.debug_struct("RocksDBBatch")
1596 .field("provider", &self.provider)
1597 .field("batch", &"<WriteBatchWithTransaction>")
1598 .field("length", &self.inner.len())
1600 .field("size_in_bytes", &self.inner.size_in_bytes())
1603 .finish()
1604 }
1605}
1606
1607impl<'a> RocksDBBatch<'a> {
1608 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1612 let encoded_key = key.encode();
1613 self.put_encoded::<T>(&encoded_key, value)
1614 }
1615
1616 pub fn put_encoded<T: Table>(
1620 &mut self,
1621 key: &<T::Key as Encode>::Encoded,
1622 value: &T::Value,
1623 ) -> ProviderResult<()> {
1624 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1625 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1626 self.maybe_auto_commit()?;
1627 Ok(())
1628 }
1629
1630 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1634 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1635 self.maybe_auto_commit()?;
1636 Ok(())
1637 }
1638
1639 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1644 if let Some(threshold) = self.auto_commit_threshold &&
1645 self.inner.size_in_bytes() >= threshold
1646 {
1647 tracing::debug!(
1648 target: "providers::rocksdb",
1649 batch_size = self.inner.size_in_bytes(),
1650 threshold,
1651 "Auto-committing RocksDB batch"
1652 );
1653 let old_batch = std::mem::take(&mut self.inner);
1654 self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1655 |e| {
1656 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1657 message: e.to_string().into(),
1658 code: -1,
1659 }))
1660 },
1661 )?;
1662 }
1663 Ok(())
1664 }
1665
1666 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1673 pub fn commit(self) -> ProviderResult<()> {
1674 self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1675 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1676 message: e.to_string().into(),
1677 code: -1,
1678 }))
1679 })
1680 }
1681
1682 pub fn len(&self) -> usize {
1684 self.inner.len()
1685 }
1686
1687 pub fn is_empty(&self) -> bool {
1689 self.inner.is_empty()
1690 }
1691
1692 pub fn size_in_bytes(&self) -> usize {
1694 self.inner.size_in_bytes()
1695 }
1696
1697 pub const fn provider(&self) -> &RocksDBProvider {
1699 self.provider
1700 }
1701
1702 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1706 self.inner
1707 }
1708
1709 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1714 self.provider.get::<T>(key)
1715 }
1716
1717 pub fn append_account_history_shard(
1729 &mut self,
1730 address: Address,
1731 indices: impl IntoIterator<Item = u64>,
1732 ) -> ProviderResult<()> {
1733 let indices: Vec<u64> = indices.into_iter().collect();
1734
1735 if indices.is_empty() {
1736 return Ok(());
1737 }
1738
1739 debug_assert!(
1740 indices.windows(2).all(|w| w[0] < w[1]),
1741 "indices must be strictly increasing: {:?}",
1742 indices
1743 );
1744
1745 let last_key = ShardedKey::new(address, u64::MAX);
1746 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1747 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1748
1749 last_shard.append(indices).map_err(ProviderError::other)?;
1750
1751 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1753 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1754 return Ok(());
1755 }
1756
1757 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1759 let mut chunks_peekable = chunks.into_iter().peekable();
1760
1761 while let Some(chunk) = chunks_peekable.next() {
1762 let shard = BlockNumberList::new_pre_sorted(chunk);
1763 let highest_block_number = if chunks_peekable.peek().is_some() {
1764 shard.iter().next_back().expect("`chunks` does not return empty list")
1765 } else {
1766 u64::MAX
1767 };
1768
1769 self.put::<tables::AccountsHistory>(
1770 ShardedKey::new(address, highest_block_number),
1771 &shard,
1772 )?;
1773 }
1774
1775 Ok(())
1776 }
1777
1778 pub fn append_storage_history_shard(
1790 &mut self,
1791 address: Address,
1792 storage_key: B256,
1793 indices: impl IntoIterator<Item = u64>,
1794 ) -> ProviderResult<()> {
1795 let indices: Vec<u64> = indices.into_iter().collect();
1796
1797 if indices.is_empty() {
1798 return Ok(());
1799 }
1800
1801 debug_assert!(
1802 indices.windows(2).all(|w| w[0] < w[1]),
1803 "indices must be strictly increasing: {:?}",
1804 indices
1805 );
1806
1807 let last_key = StorageShardedKey::last(address, storage_key);
1808 let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1809 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1810
1811 last_shard.append(indices).map_err(ProviderError::other)?;
1812
1813 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1815 self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1816 return Ok(());
1817 }
1818
1819 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1821 let mut chunks_peekable = chunks.into_iter().peekable();
1822
1823 while let Some(chunk) = chunks_peekable.next() {
1824 let shard = BlockNumberList::new_pre_sorted(chunk);
1825 let highest_block_number = if chunks_peekable.peek().is_some() {
1826 shard.iter().next_back().expect("`chunks` does not return empty list")
1827 } else {
1828 u64::MAX
1829 };
1830
1831 self.put::<tables::StoragesHistory>(
1832 StorageShardedKey::new(address, storage_key, highest_block_number),
1833 &shard,
1834 )?;
1835 }
1836
1837 Ok(())
1838 }
1839
1840 pub fn unwind_account_history_to(
1847 &mut self,
1848 address: Address,
1849 keep_to: BlockNumber,
1850 ) -> ProviderResult<()> {
1851 let shards = self.provider.account_history_shards(address)?;
1852 if shards.is_empty() {
1853 return Ok(());
1854 }
1855
1856 let boundary_idx = shards.iter().position(|(key, _)| {
1859 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1860 });
1861
1862 let Some(boundary_idx) = boundary_idx else {
1864 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1865 if last_key.highest_block_number != u64::MAX {
1866 self.delete::<tables::AccountsHistory>(last_key.clone())?;
1867 self.put::<tables::AccountsHistory>(
1868 ShardedKey::new(address, u64::MAX),
1869 last_value,
1870 )?;
1871 }
1872 return Ok(());
1873 };
1874
1875 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1877 self.delete::<tables::AccountsHistory>(key.clone())?;
1878 }
1879
1880 let (boundary_key, boundary_list) = &shards[boundary_idx];
1882
1883 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1885
1886 let new_last =
1888 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1889
1890 if new_last.is_empty() {
1891 if boundary_idx == 0 {
1894 return Ok(());
1896 }
1897
1898 let (prev_key, prev_value) = &shards[boundary_idx - 1];
1899 if prev_key.highest_block_number != u64::MAX {
1900 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1901 self.put::<tables::AccountsHistory>(
1902 ShardedKey::new(address, u64::MAX),
1903 prev_value,
1904 )?;
1905 }
1906 return Ok(());
1907 }
1908
1909 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1910
1911 Ok(())
1912 }
1913
1914 #[allow(clippy::too_many_arguments)]
1920 fn prune_history_shards_inner<K>(
1921 &mut self,
1922 shards: Vec<(K, BlockNumberList)>,
1923 to_block: BlockNumber,
1924 get_highest: impl Fn(&K) -> u64,
1925 is_sentinel: impl Fn(&K) -> bool,
1926 delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
1927 put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
1928 create_sentinel: impl Fn() -> K,
1929 ) -> ProviderResult<PruneShardOutcome>
1930 where
1931 K: Clone,
1932 {
1933 if shards.is_empty() {
1934 return Ok(PruneShardOutcome::Unchanged);
1935 }
1936
1937 let mut deleted = false;
1938 let mut updated = false;
1939 let mut last_remaining: Option<(K, BlockNumberList)> = None;
1940
1941 for (key, block_list) in shards {
1942 if !is_sentinel(&key) && get_highest(&key) <= to_block {
1943 delete_shard(self, key)?;
1944 deleted = true;
1945 } else {
1946 let original_len = block_list.len();
1947 let filtered =
1948 BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
1949
1950 if filtered.is_empty() {
1951 delete_shard(self, key)?;
1952 deleted = true;
1953 } else if filtered.len() < original_len {
1954 put_shard(self, key.clone(), &filtered)?;
1955 last_remaining = Some((key, filtered));
1956 updated = true;
1957 } else {
1958 last_remaining = Some((key, block_list));
1959 }
1960 }
1961 }
1962
1963 if let Some((last_key, last_value)) = last_remaining &&
1964 !is_sentinel(&last_key)
1965 {
1966 delete_shard(self, last_key)?;
1967 put_shard(self, create_sentinel(), &last_value)?;
1968 updated = true;
1969 }
1970
1971 if deleted {
1972 Ok(PruneShardOutcome::Deleted)
1973 } else if updated {
1974 Ok(PruneShardOutcome::Updated)
1975 } else {
1976 Ok(PruneShardOutcome::Unchanged)
1977 }
1978 }
1979
1980 pub fn prune_account_history_to(
1985 &mut self,
1986 address: Address,
1987 to_block: BlockNumber,
1988 ) -> ProviderResult<PruneShardOutcome> {
1989 let shards = self.provider.account_history_shards(address)?;
1990 self.prune_history_shards_inner(
1991 shards,
1992 to_block,
1993 |key| key.highest_block_number,
1994 |key| key.highest_block_number == u64::MAX,
1995 |batch, key| batch.delete::<tables::AccountsHistory>(key),
1996 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1997 || ShardedKey::new(address, u64::MAX),
1998 )
1999 }
2000
2001 pub fn prune_account_history_batch(
2010 &mut self,
2011 targets: &[(Address, BlockNumber)],
2012 ) -> ProviderResult<PrunedIndices> {
2013 if targets.is_empty() {
2014 return Ok(PrunedIndices::default());
2015 }
2016
2017 debug_assert!(
2018 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2019 "prune_account_history_batch: targets must be sorted by address"
2020 );
2021
2022 const PREFIX_LEN: usize = 20;
2025
2026 let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2027 let mut iter = self.provider.0.raw_iterator_cf(cf);
2028 let mut outcomes = PrunedIndices::default();
2029
2030 for (address, to_block) in targets {
2031 let start_key = ShardedKey::new(*address, 0u64).encode();
2033 let target_prefix = &start_key[..PREFIX_LEN];
2034
2035 let needs_seek = if iter.valid() {
2041 if let Some(current_key) = iter.key() {
2042 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2046 } else {
2047 true
2048 }
2049 } else {
2050 true
2051 };
2052
2053 if needs_seek {
2054 iter.seek(start_key);
2055 iter.status().map_err(|e| {
2056 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2057 message: e.to_string().into(),
2058 code: -1,
2059 }))
2060 })?;
2061 }
2062
2063 let mut shards = Vec::new();
2065 while iter.valid() {
2066 let Some(key_bytes) = iter.key() else { break };
2067
2068 let current_prefix = key_bytes.get(..PREFIX_LEN);
2070 if current_prefix != Some(target_prefix) {
2071 break;
2072 }
2073
2074 let key = ShardedKey::<Address>::decode(key_bytes)
2076 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2077
2078 let Some(value_bytes) = iter.value() else { break };
2079 let value = BlockNumberList::decompress(value_bytes)
2080 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2081
2082 shards.push((key, value));
2083 iter.next();
2084 }
2085
2086 match self.prune_history_shards_inner(
2087 shards,
2088 *to_block,
2089 |key| key.highest_block_number,
2090 |key| key.highest_block_number == u64::MAX,
2091 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2092 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2093 || ShardedKey::new(*address, u64::MAX),
2094 )? {
2095 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2096 PruneShardOutcome::Updated => outcomes.updated += 1,
2097 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2098 }
2099 }
2100
2101 Ok(outcomes)
2102 }
2103
2104 pub fn prune_storage_history_to(
2110 &mut self,
2111 address: Address,
2112 storage_key: B256,
2113 to_block: BlockNumber,
2114 ) -> ProviderResult<PruneShardOutcome> {
2115 let shards = self.provider.storage_history_shards(address, storage_key)?;
2116 self.prune_history_shards_inner(
2117 shards,
2118 to_block,
2119 |key| key.sharded_key.highest_block_number,
2120 |key| key.sharded_key.highest_block_number == u64::MAX,
2121 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2122 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2123 || StorageShardedKey::last(address, storage_key),
2124 )
2125 }
2126
2127 pub fn prune_storage_history_batch(
2137 &mut self,
2138 targets: &[((Address, B256), BlockNumber)],
2139 ) -> ProviderResult<PrunedIndices> {
2140 if targets.is_empty() {
2141 return Ok(PrunedIndices::default());
2142 }
2143
2144 debug_assert!(
2145 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2146 "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2147 );
2148
2149 const PREFIX_LEN: usize = 52;
2152
2153 let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2154 let mut iter = self.provider.0.raw_iterator_cf(cf);
2155 let mut outcomes = PrunedIndices::default();
2156
2157 for ((address, storage_key), to_block) in targets {
2158 let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2160 let target_prefix = &start_key[..PREFIX_LEN];
2161
2162 let needs_seek = if iter.valid() {
2168 if let Some(current_key) = iter.key() {
2169 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2173 } else {
2174 true
2175 }
2176 } else {
2177 true
2178 };
2179
2180 if needs_seek {
2181 iter.seek(start_key);
2182 iter.status().map_err(|e| {
2183 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2184 message: e.to_string().into(),
2185 code: -1,
2186 }))
2187 })?;
2188 }
2189
2190 let mut shards = Vec::new();
2192 while iter.valid() {
2193 let Some(key_bytes) = iter.key() else { break };
2194
2195 let current_prefix = key_bytes.get(..PREFIX_LEN);
2197 if current_prefix != Some(target_prefix) {
2198 break;
2199 }
2200
2201 let key = StorageShardedKey::decode(key_bytes)
2203 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2204
2205 let Some(value_bytes) = iter.value() else { break };
2206 let value = BlockNumberList::decompress(value_bytes)
2207 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2208
2209 shards.push((key, value));
2210 iter.next();
2211 }
2212
2213 match self.prune_history_shards_inner(
2215 shards,
2216 *to_block,
2217 |key| key.sharded_key.highest_block_number,
2218 |key| key.sharded_key.highest_block_number == u64::MAX,
2219 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2220 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2221 || StorageShardedKey::last(*address, *storage_key),
2222 )? {
2223 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2224 PruneShardOutcome::Updated => outcomes.updated += 1,
2225 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2226 }
2227 }
2228
2229 Ok(outcomes)
2230 }
2231
2232 pub fn unwind_storage_history_to(
2241 &mut self,
2242 address: Address,
2243 storage_key: B256,
2244 keep_to: BlockNumber,
2245 ) -> ProviderResult<()> {
2246 let shards = self.provider.storage_history_shards(address, storage_key)?;
2247 if shards.is_empty() {
2248 return Ok(());
2249 }
2250
2251 let boundary_idx = shards.iter().position(|(key, _)| {
2254 key.sharded_key.highest_block_number == u64::MAX ||
2255 key.sharded_key.highest_block_number > keep_to
2256 });
2257
2258 let Some(boundary_idx) = boundary_idx else {
2260 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2261 if last_key.sharded_key.highest_block_number != u64::MAX {
2262 self.delete::<tables::StoragesHistory>(last_key.clone())?;
2263 self.put::<tables::StoragesHistory>(
2264 StorageShardedKey::last(address, storage_key),
2265 last_value,
2266 )?;
2267 }
2268 return Ok(());
2269 };
2270
2271 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2273 self.delete::<tables::StoragesHistory>(key.clone())?;
2274 }
2275
2276 let (boundary_key, boundary_list) = &shards[boundary_idx];
2278
2279 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2281
2282 let new_last =
2284 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2285
2286 if new_last.is_empty() {
2287 if boundary_idx == 0 {
2290 return Ok(());
2292 }
2293
2294 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2295 if prev_key.sharded_key.highest_block_number != u64::MAX {
2296 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2297 self.put::<tables::StoragesHistory>(
2298 StorageShardedKey::last(address, storage_key),
2299 prev_value,
2300 )?;
2301 }
2302 return Ok(());
2303 }
2304
2305 self.put::<tables::StoragesHistory>(
2306 StorageShardedKey::last(address, storage_key),
2307 &new_last,
2308 )?;
2309
2310 Ok(())
2311 }
2312
2313 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2317 let shards = self.provider.account_history_shards(address)?;
2318 for (key, _) in shards {
2319 self.delete::<tables::AccountsHistory>(key)?;
2320 }
2321 Ok(())
2322 }
2323
2324 pub fn clear_storage_history(
2328 &mut self,
2329 address: Address,
2330 storage_key: B256,
2331 ) -> ProviderResult<()> {
2332 let shards = self.provider.storage_history_shards(address, storage_key)?;
2333 for (key, _) in shards {
2334 self.delete::<tables::StoragesHistory>(key)?;
2335 }
2336 Ok(())
2337 }
2338}
2339
2340pub struct RocksTx<'db> {
2350 inner: Transaction<'db, OptimisticTransactionDB>,
2351 provider: &'db RocksDBProvider,
2352}
2353
2354impl fmt::Debug for RocksTx<'_> {
2355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2356 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2357 }
2358}
2359
2360impl<'db> RocksTx<'db> {
2361 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2363 let encoded_key = key.encode();
2364 self.get_encoded::<T>(&encoded_key)
2365 }
2366
2367 pub fn get_encoded<T: Table>(
2369 &self,
2370 key: &<T::Key as Encode>::Encoded,
2371 ) -> ProviderResult<Option<T::Value>> {
2372 let cf = self.provider.get_cf_handle::<T>()?;
2373 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2374 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2375 message: e.to_string().into(),
2376 code: -1,
2377 }))
2378 })?;
2379
2380 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2381 }
2382
2383 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2385 let encoded_key = key.encode();
2386 self.put_encoded::<T>(&encoded_key, value)
2387 }
2388
2389 pub fn put_encoded<T: Table>(
2391 &self,
2392 key: &<T::Key as Encode>::Encoded,
2393 value: &T::Value,
2394 ) -> ProviderResult<()> {
2395 let cf = self.provider.get_cf_handle::<T>()?;
2396 let mut buf = Vec::new();
2397 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2398
2399 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2400 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2401 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2402 operation: DatabaseWriteOperation::PutUpsert,
2403 table_name: T::NAME,
2404 key: key.as_ref().to_vec(),
2405 })))
2406 })
2407 }
2408
2409 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2411 let cf = self.provider.get_cf_handle::<T>()?;
2412 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2413 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2414 message: e.to_string().into(),
2415 code: -1,
2416 }))
2417 })
2418 }
2419
2420 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2424 let cf = self.provider.get_cf_handle::<T>()?;
2425 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2426 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2427 }
2428
2429 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2431 let cf = self.provider.get_cf_handle::<T>()?;
2432 let encoded_key = key.encode();
2433 let iter = self
2434 .inner
2435 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2436 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2437 }
2438
2439 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2441 pub fn commit(self) -> ProviderResult<()> {
2442 self.inner.commit().map_err(|e| {
2443 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2444 message: e.to_string().into(),
2445 code: -1,
2446 }))
2447 })
2448 }
2449
2450 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2452 pub fn rollback(self) -> ProviderResult<()> {
2453 self.inner.rollback().map_err(|e| {
2454 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2455 })
2456 }
2457}
2458
2459enum RocksDBIterEnum<'db> {
2461 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2463 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2465}
2466
2467impl Iterator for RocksDBIterEnum<'_> {
2468 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2469
2470 fn next(&mut self) -> Option<Self::Item> {
2471 match self {
2472 Self::ReadWrite(iter) => iter.next(),
2473 Self::ReadOnly(iter) => iter.next(),
2474 }
2475 }
2476}
2477
2478enum RocksDBRawIterEnum<'db> {
2483 ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2485 ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2487}
2488
2489impl RocksDBRawIterEnum<'_> {
2490 fn seek(&mut self, key: impl AsRef<[u8]>) {
2492 match self {
2493 Self::ReadWrite(iter) => iter.seek(key),
2494 Self::ReadOnly(iter) => iter.seek(key),
2495 }
2496 }
2497
2498 fn valid(&self) -> bool {
2500 match self {
2501 Self::ReadWrite(iter) => iter.valid(),
2502 Self::ReadOnly(iter) => iter.valid(),
2503 }
2504 }
2505
2506 fn key(&self) -> Option<&[u8]> {
2508 match self {
2509 Self::ReadWrite(iter) => iter.key(),
2510 Self::ReadOnly(iter) => iter.key(),
2511 }
2512 }
2513
2514 fn value(&self) -> Option<&[u8]> {
2516 match self {
2517 Self::ReadWrite(iter) => iter.value(),
2518 Self::ReadOnly(iter) => iter.value(),
2519 }
2520 }
2521
2522 fn next(&mut self) {
2524 match self {
2525 Self::ReadWrite(iter) => iter.next(),
2526 Self::ReadOnly(iter) => iter.next(),
2527 }
2528 }
2529
2530 fn prev(&mut self) {
2532 match self {
2533 Self::ReadWrite(iter) => iter.prev(),
2534 Self::ReadOnly(iter) => iter.prev(),
2535 }
2536 }
2537
2538 fn status(&self) -> Result<(), rocksdb::Error> {
2540 match self {
2541 Self::ReadWrite(iter) => iter.status(),
2542 Self::ReadOnly(iter) => iter.status(),
2543 }
2544 }
2545}
2546
2547pub struct RocksDBIter<'db, T: Table> {
2551 inner: RocksDBIterEnum<'db>,
2552 _marker: std::marker::PhantomData<T>,
2553}
2554
2555impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2556 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2557 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2558 }
2559}
2560
2561impl<T: Table> Iterator for RocksDBIter<'_, T> {
2562 type Item = ProviderResult<(T::Key, T::Value)>;
2563
2564 fn next(&mut self) -> Option<Self::Item> {
2565 Some(decode_iter_item::<T>(self.inner.next()?))
2566 }
2567}
2568
2569pub struct RocksDBRawIter<'db> {
2573 inner: RocksDBIterEnum<'db>,
2574}
2575
2576impl fmt::Debug for RocksDBRawIter<'_> {
2577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2578 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2579 }
2580}
2581
2582impl Iterator for RocksDBRawIter<'_> {
2583 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2584
2585 fn next(&mut self) -> Option<Self::Item> {
2586 match self.inner.next()? {
2587 Ok(kv) => Some(Ok(kv)),
2588 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2589 message: e.to_string().into(),
2590 code: -1,
2591 })))),
2592 }
2593 }
2594}
2595
2596pub struct RocksTxIter<'tx, T: Table> {
2600 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2601 _marker: std::marker::PhantomData<T>,
2602}
2603
2604impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2605 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2606 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2607 }
2608}
2609
2610impl<T: Table> Iterator for RocksTxIter<'_, T> {
2611 type Item = ProviderResult<(T::Key, T::Value)>;
2612
2613 fn next(&mut self) -> Option<Self::Item> {
2614 Some(decode_iter_item::<T>(self.inner.next()?))
2615 }
2616}
2617
2618fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2623 let (key_bytes, value_bytes) = result.map_err(|e| {
2624 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2625 message: e.to_string().into(),
2626 code: -1,
2627 }))
2628 })?;
2629
2630 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2631 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2632
2633 let value = T::Value::decompress(&value_bytes)
2634 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2635
2636 Ok((key, value))
2637}
2638
2639const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2641 match level {
2642 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2643 LogLevel::Error => rocksdb::LogLevel::Error,
2644 LogLevel::Warn => rocksdb::LogLevel::Warn,
2645 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2646 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2647 }
2648}
2649
2650#[cfg(test)]
2651mod tests {
2652 use super::*;
2653 use crate::providers::HistoryInfo;
2654 use alloy_primitives::{Address, TxHash, B256};
2655 use reth_db_api::{
2656 models::{
2657 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2658 storage_sharded_key::StorageShardedKey,
2659 IntegerList,
2660 },
2661 table::Table,
2662 tables,
2663 };
2664 use tempfile::TempDir;
2665
2666 #[test]
2667 fn test_with_default_tables_registers_required_column_families() {
2668 let temp_dir = TempDir::new().unwrap();
2669
2670 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2672
2673 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2675 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2676 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2677
2678 let key = ShardedKey::new(Address::ZERO, 100);
2680 let value = IntegerList::default();
2681 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2682 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2683
2684 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2686 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2687 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2688 }
2689
2690 #[derive(Debug)]
2691 struct TestTable;
2692
2693 impl Table for TestTable {
2694 const NAME: &'static str = "TestTable";
2695 const DUPSORT: bool = false;
2696 type Key = u64;
2697 type Value = Vec<u8>;
2698 }
2699
2700 #[test]
2701 fn test_basic_operations() {
2702 let temp_dir = TempDir::new().unwrap();
2703
2704 let provider = RocksDBBuilder::new(temp_dir.path())
2705 .with_table::<TestTable>() .build()
2707 .unwrap();
2708
2709 let key = 42u64;
2710 let value = b"test_value".to_vec();
2711
2712 provider.put::<TestTable>(key, &value).unwrap();
2714
2715 let result = provider.get::<TestTable>(key).unwrap();
2717 assert_eq!(result, Some(value));
2718
2719 provider.delete::<TestTable>(key).unwrap();
2721
2722 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2724 }
2725
2726 #[test]
2727 fn test_batch_operations() {
2728 let temp_dir = TempDir::new().unwrap();
2729 let provider =
2730 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2731
2732 provider
2734 .write_batch(|batch| {
2735 for i in 0..10u64 {
2736 let value = format!("value_{i}").into_bytes();
2737 batch.put::<TestTable>(i, &value)?;
2738 }
2739 Ok(())
2740 })
2741 .unwrap();
2742
2743 for i in 0..10u64 {
2745 let value = format!("value_{i}").into_bytes();
2746 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2747 }
2748
2749 provider
2751 .write_batch(|batch| {
2752 for i in 0..10u64 {
2753 batch.delete::<TestTable>(i)?;
2754 }
2755 Ok(())
2756 })
2757 .unwrap();
2758
2759 for i in 0..10u64 {
2761 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2762 }
2763 }
2764
2765 #[test]
2766 fn test_with_real_table() {
2767 let temp_dir = TempDir::new().unwrap();
2768 let provider = RocksDBBuilder::new(temp_dir.path())
2769 .with_table::<tables::TransactionHashNumbers>()
2770 .with_metrics()
2771 .build()
2772 .unwrap();
2773
2774 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2775
2776 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2778 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2779
2780 provider
2782 .write_batch(|batch| {
2783 for i in 0..10u64 {
2784 let hash = TxHash::from(B256::from([i as u8; 32]));
2785 let value = i * 100;
2786 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2787 }
2788 Ok(())
2789 })
2790 .unwrap();
2791
2792 for i in 0..10u64 {
2794 let hash = TxHash::from(B256::from([i as u8; 32]));
2795 assert_eq!(
2796 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2797 Some(i * 100)
2798 );
2799 }
2800 }
2801 #[test]
2802 fn test_statistics_enabled() {
2803 let temp_dir = TempDir::new().unwrap();
2804 let provider = RocksDBBuilder::new(temp_dir.path())
2806 .with_table::<TestTable>()
2807 .with_statistics()
2808 .build()
2809 .unwrap();
2810
2811 for i in 0..10 {
2813 let value = vec![i as u8];
2814 provider.put::<TestTable>(i, &value).unwrap();
2815 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2817 }
2818 }
2819
2820 #[test]
2821 fn test_data_persistence() {
2822 let temp_dir = TempDir::new().unwrap();
2823 let provider =
2824 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2825
2826 let value = vec![42u8; 1000];
2828 for i in 0..100 {
2829 provider.put::<TestTable>(i, &value).unwrap();
2830 }
2831
2832 for i in 0..100 {
2834 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2835 }
2836 }
2837
2838 #[test]
2839 fn test_transaction_read_your_writes() {
2840 let temp_dir = TempDir::new().unwrap();
2841 let provider =
2842 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2843
2844 let tx = provider.tx();
2846
2847 let key = 42u64;
2849 let value = b"test_value".to_vec();
2850 tx.put::<TestTable>(key, &value).unwrap();
2851
2852 let result = tx.get::<TestTable>(key).unwrap();
2854 assert_eq!(
2855 result,
2856 Some(value.clone()),
2857 "Transaction should see its own uncommitted writes"
2858 );
2859
2860 let provider_result = provider.get::<TestTable>(key).unwrap();
2862 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2863
2864 tx.commit().unwrap();
2866
2867 let committed_result = provider.get::<TestTable>(key).unwrap();
2869 assert_eq!(committed_result, Some(value), "Committed data should be visible");
2870 }
2871
2872 #[test]
2873 fn test_transaction_rollback() {
2874 let temp_dir = TempDir::new().unwrap();
2875 let provider =
2876 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2877
2878 let key = 100u64;
2880 let initial_value = b"initial".to_vec();
2881 provider.put::<TestTable>(key, &initial_value).unwrap();
2882
2883 let tx = provider.tx();
2885 let new_value = b"modified".to_vec();
2886 tx.put::<TestTable>(key, &new_value).unwrap();
2887
2888 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2890
2891 tx.rollback().unwrap();
2893
2894 let result = provider.get::<TestTable>(key).unwrap();
2896 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2897 }
2898
2899 #[test]
2900 fn test_transaction_iterator() {
2901 let temp_dir = TempDir::new().unwrap();
2902 let provider =
2903 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2904
2905 let tx = provider.tx();
2907
2908 for i in 0..5u64 {
2910 let value = format!("value_{i}").into_bytes();
2911 tx.put::<TestTable>(i, &value).unwrap();
2912 }
2913
2914 let mut count = 0;
2916 for result in tx.iter::<TestTable>().unwrap() {
2917 let (key, value) = result.unwrap();
2918 assert_eq!(value, format!("value_{key}").into_bytes());
2919 count += 1;
2920 }
2921 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2922
2923 tx.commit().unwrap();
2925 }
2926
2927 #[test]
2928 fn test_batch_manual_commit() {
2929 let temp_dir = TempDir::new().unwrap();
2930 let provider =
2931 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2932
2933 let mut batch = provider.batch();
2935
2936 for i in 0..10u64 {
2938 let value = format!("batch_value_{i}").into_bytes();
2939 batch.put::<TestTable>(i, &value).unwrap();
2940 }
2941
2942 assert_eq!(batch.len(), 10);
2944 assert!(!batch.is_empty());
2945
2946 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2948
2949 batch.commit().unwrap();
2951
2952 for i in 0..10u64 {
2954 let value = format!("batch_value_{i}").into_bytes();
2955 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2956 }
2957 }
2958
2959 #[test]
2960 fn test_first_and_last_entry() {
2961 let temp_dir = TempDir::new().unwrap();
2962 let provider =
2963 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2964
2965 assert_eq!(provider.first::<TestTable>().unwrap(), None);
2967 assert_eq!(provider.last::<TestTable>().unwrap(), None);
2968
2969 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2971 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2972 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2973
2974 let first = provider.first::<TestTable>().unwrap();
2976 assert_eq!(first, Some((5, b"value_5".to_vec())));
2977
2978 let last = provider.last::<TestTable>().unwrap();
2980 assert_eq!(last, Some((20, b"value_20".to_vec())));
2981 }
2982
2983 #[test]
2987 fn test_account_history_info_pruned_before_first_entry() {
2988 let temp_dir = TempDir::new().unwrap();
2989 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2990
2991 let address = Address::from([0x42; 20]);
2992
2993 let chunk = IntegerList::new([100, 200, 300]).unwrap();
2995 let shard_key = ShardedKey::new(address, u64::MAX);
2996 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2997
2998 let result = provider.snapshot().account_history_info(address, 50, Some(100)).unwrap();
3003 assert_eq!(result, HistoryInfo::InChangeset(100));
3004 }
3005
3006 #[test]
3009 fn test_account_history_info_read_only() {
3010 let temp_dir = TempDir::new().unwrap();
3011 let address = Address::from([0x42; 20]);
3012 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3013 let shard_key = ShardedKey::new(address, u64::MAX);
3014
3015 {
3017 let provider =
3018 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3019 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3020 }
3021
3022 let ro_provider = RocksDBBuilder::new(temp_dir.path())
3024 .with_default_tables()
3025 .with_read_only(true)
3026 .build()
3027 .unwrap();
3028
3029 let result = ro_provider.snapshot().account_history_info(address, 200, None).unwrap();
3030 assert_eq!(result, HistoryInfo::InChangeset(200));
3031
3032 let result = ro_provider.snapshot().account_history_info(address, 50, None).unwrap();
3033 assert_eq!(result, HistoryInfo::NotYetWritten);
3034
3035 let result = ro_provider.snapshot().account_history_info(address, 400, None).unwrap();
3036 assert_eq!(result, HistoryInfo::InPlainState);
3037 }
3038
3039 #[test]
3040 fn test_account_history_shard_split_at_boundary() {
3041 let temp_dir = TempDir::new().unwrap();
3042 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3043
3044 let address = Address::from([0x42; 20]);
3045 let limit = NUM_OF_INDICES_IN_SHARD;
3046
3047 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3049 let mut batch = provider.batch();
3050 batch.append_account_history_shard(address, indices).unwrap();
3051 batch.commit().unwrap();
3052
3053 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3055 let sentinel_key = ShardedKey::new(address, u64::MAX);
3056
3057 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3058 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3059
3060 assert!(completed_shard.is_some(), "completed shard should exist");
3061 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3062
3063 let completed_shard = completed_shard.unwrap();
3064 let sentinel_shard = sentinel_shard.unwrap();
3065
3066 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3067 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3068 }
3069
3070 #[test]
3071 fn test_account_history_multiple_shard_splits() {
3072 let temp_dir = TempDir::new().unwrap();
3073 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3074
3075 let address = Address::from([0x43; 20]);
3076 let limit = NUM_OF_INDICES_IN_SHARD;
3077
3078 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3080 let mut batch = provider.batch();
3081 batch.append_account_history_shard(address, first_batch_indices).unwrap();
3082 batch.commit().unwrap();
3083
3084 let sentinel_key = ShardedKey::new(address, u64::MAX);
3086 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3087 assert!(shard.is_some());
3088 assert_eq!(shard.unwrap().len(), limit as u64);
3089
3090 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3092 let mut batch = provider.batch();
3093 batch.append_account_history_shard(address, second_batch_indices).unwrap();
3094 batch.commit().unwrap();
3095
3096 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3098 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3099
3100 assert!(
3101 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3102 "first completed shard should exist"
3103 );
3104 assert!(
3105 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3106 "second completed shard should exist"
3107 );
3108 assert!(
3109 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3110 "sentinel shard should exist"
3111 );
3112 }
3113
3114 #[test]
3115 fn test_storage_history_shard_split_at_boundary() {
3116 let temp_dir = TempDir::new().unwrap();
3117 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3118
3119 let address = Address::from([0x44; 20]);
3120 let slot = B256::from([0x55; 32]);
3121 let limit = NUM_OF_INDICES_IN_SHARD;
3122
3123 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3125 let mut batch = provider.batch();
3126 batch.append_storage_history_shard(address, slot, indices).unwrap();
3127 batch.commit().unwrap();
3128
3129 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3131 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3132
3133 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3134 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3135
3136 assert!(completed_shard.is_some(), "completed shard should exist");
3137 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3138
3139 let completed_shard = completed_shard.unwrap();
3140 let sentinel_shard = sentinel_shard.unwrap();
3141
3142 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3143 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3144 }
3145
3146 #[test]
3147 fn test_storage_history_multiple_shard_splits() {
3148 let temp_dir = TempDir::new().unwrap();
3149 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3150
3151 let address = Address::from([0x46; 20]);
3152 let slot = B256::from([0x57; 32]);
3153 let limit = NUM_OF_INDICES_IN_SHARD;
3154
3155 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3157 let mut batch = provider.batch();
3158 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3159 batch.commit().unwrap();
3160
3161 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3163 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3164 assert!(shard.is_some());
3165 assert_eq!(shard.unwrap().len(), limit as u64);
3166
3167 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3169 let mut batch = provider.batch();
3170 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3171 batch.commit().unwrap();
3172
3173 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3175 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3176
3177 assert!(
3178 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3179 "first completed shard should exist"
3180 );
3181 assert!(
3182 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3183 "second completed shard should exist"
3184 );
3185 assert!(
3186 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3187 "sentinel shard should exist"
3188 );
3189 }
3190
3191 #[test]
3192 fn test_clear_table() {
3193 let temp_dir = TempDir::new().unwrap();
3194 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3195
3196 let address = Address::from([0x42; 20]);
3197 let key = ShardedKey::new(address, u64::MAX);
3198 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3199
3200 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3201 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3202
3203 provider.clear::<tables::AccountsHistory>().unwrap();
3204
3205 assert!(
3206 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3207 "table should be empty after clear"
3208 );
3209 assert!(
3210 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3211 "first() should return None after clear"
3212 );
3213 }
3214
3215 #[test]
3216 fn test_clear_empty_table() {
3217 let temp_dir = TempDir::new().unwrap();
3218 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3219
3220 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3221
3222 provider.clear::<tables::AccountsHistory>().unwrap();
3223
3224 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3225 }
3226
3227 #[test]
3228 fn test_unwind_account_history_to_basic() {
3229 let temp_dir = TempDir::new().unwrap();
3230 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3231
3232 let address = Address::from([0x42; 20]);
3233
3234 let mut batch = provider.batch();
3236 batch.append_account_history_shard(address, 0..=10).unwrap();
3237 batch.commit().unwrap();
3238
3239 let key = ShardedKey::new(address, u64::MAX);
3241 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3242 assert!(result.is_some());
3243 let blocks: Vec<u64> = result.unwrap().iter().collect();
3244 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3245
3246 let mut batch = provider.batch();
3248 batch.unwind_account_history_to(address, 5).unwrap();
3249 batch.commit().unwrap();
3250
3251 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3253 assert!(result.is_some());
3254 let blocks: Vec<u64> = result.unwrap().iter().collect();
3255 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3256 }
3257
3258 #[test]
3259 fn test_unwind_account_history_to_removes_all() {
3260 let temp_dir = TempDir::new().unwrap();
3261 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3262
3263 let address = Address::from([0x42; 20]);
3264
3265 let mut batch = provider.batch();
3267 batch.append_account_history_shard(address, 5..=10).unwrap();
3268 batch.commit().unwrap();
3269
3270 let mut batch = provider.batch();
3272 batch.unwind_account_history_to(address, 4).unwrap();
3273 batch.commit().unwrap();
3274
3275 let key = ShardedKey::new(address, u64::MAX);
3277 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3278 assert!(result.is_none(), "Should have no data after full unwind");
3279 }
3280
3281 #[test]
3282 fn test_unwind_account_history_to_no_op() {
3283 let temp_dir = TempDir::new().unwrap();
3284 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3285
3286 let address = Address::from([0x42; 20]);
3287
3288 let mut batch = provider.batch();
3290 batch.append_account_history_shard(address, 0..=5).unwrap();
3291 batch.commit().unwrap();
3292
3293 let mut batch = provider.batch();
3295 batch.unwind_account_history_to(address, 10).unwrap();
3296 batch.commit().unwrap();
3297
3298 let key = ShardedKey::new(address, u64::MAX);
3300 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3301 assert!(result.is_some());
3302 let blocks: Vec<u64> = result.unwrap().iter().collect();
3303 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3304 }
3305
3306 #[test]
3307 fn test_unwind_account_history_to_block_zero() {
3308 let temp_dir = TempDir::new().unwrap();
3309 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3310
3311 let address = Address::from([0x42; 20]);
3312
3313 let mut batch = provider.batch();
3315 batch.append_account_history_shard(address, 0..=5).unwrap();
3316 batch.commit().unwrap();
3317
3318 let mut batch = provider.batch();
3321 batch.unwind_account_history_to(address, 0).unwrap();
3322 batch.commit().unwrap();
3323
3324 let key = ShardedKey::new(address, u64::MAX);
3326 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3327 assert!(result.is_some());
3328 let blocks: Vec<u64> = result.unwrap().iter().collect();
3329 assert_eq!(blocks, vec![0]);
3330 }
3331
3332 #[test]
3333 fn test_unwind_account_history_to_multi_shard() {
3334 let temp_dir = TempDir::new().unwrap();
3335 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3336
3337 let address = Address::from([0x42; 20]);
3338
3339 let mut batch = provider.batch();
3342
3343 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3345 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3346
3347 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3349 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3350
3351 batch.commit().unwrap();
3352
3353 let shards = provider.account_history_shards(address).unwrap();
3355 assert_eq!(shards.len(), 2);
3356
3357 let mut batch = provider.batch();
3359 batch.unwind_account_history_to(address, 75).unwrap();
3360 batch.commit().unwrap();
3361
3362 let shards = provider.account_history_shards(address).unwrap();
3364 assert_eq!(shards.len(), 2);
3365
3366 assert_eq!(shards[0].0.highest_block_number, 50);
3368 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3369
3370 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3372 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3373 }
3374
3375 #[test]
3376 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3377 let temp_dir = TempDir::new().unwrap();
3378 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3379
3380 let address = Address::from([0x42; 20]);
3381
3382 let mut batch = provider.batch();
3384
3385 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3387 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3388
3389 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3391 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3392
3393 batch.commit().unwrap();
3394
3395 let mut batch = provider.batch();
3397 batch.unwind_account_history_to(address, 60).unwrap();
3398 batch.commit().unwrap();
3399
3400 let shards = provider.account_history_shards(address).unwrap();
3402 assert_eq!(shards.len(), 1);
3403 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3404 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3405 }
3406
3407 #[test]
3408 fn test_account_history_shards_iterator() {
3409 let temp_dir = TempDir::new().unwrap();
3410 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3411
3412 let address = Address::from([0x42; 20]);
3413 let other_address = Address::from([0x43; 20]);
3414
3415 let mut batch = provider.batch();
3417 batch.append_account_history_shard(address, 0..=5).unwrap();
3418 batch.append_account_history_shard(other_address, 10..=15).unwrap();
3419 batch.commit().unwrap();
3420
3421 let shards = provider.account_history_shards(address).unwrap();
3423 assert_eq!(shards.len(), 1);
3424 assert_eq!(shards[0].0.key, address);
3425
3426 let shards = provider.account_history_shards(other_address).unwrap();
3428 assert_eq!(shards.len(), 1);
3429 assert_eq!(shards[0].0.key, other_address);
3430
3431 let non_existent = Address::from([0x99; 20]);
3433 let shards = provider.account_history_shards(non_existent).unwrap();
3434 assert!(shards.is_empty());
3435 }
3436
3437 #[test]
3438 fn test_clear_account_history() {
3439 let temp_dir = TempDir::new().unwrap();
3440 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3441
3442 let address = Address::from([0x42; 20]);
3443
3444 let mut batch = provider.batch();
3446 batch.append_account_history_shard(address, 0..=10).unwrap();
3447 batch.commit().unwrap();
3448
3449 let mut batch = provider.batch();
3451 batch.clear_account_history(address).unwrap();
3452 batch.commit().unwrap();
3453
3454 let shards = provider.account_history_shards(address).unwrap();
3456 assert!(shards.is_empty(), "All shards should be deleted");
3457 }
3458
3459 #[test]
3460 fn test_unwind_non_sentinel_boundary() {
3461 let temp_dir = TempDir::new().unwrap();
3462 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3463
3464 let address = Address::from([0x42; 20]);
3465
3466 let mut batch = provider.batch();
3468
3469 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3471 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3472
3473 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3475 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3476
3477 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3479 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3480
3481 batch.commit().unwrap();
3482
3483 let shards = provider.account_history_shards(address).unwrap();
3485 assert_eq!(shards.len(), 3);
3486
3487 let mut batch = provider.batch();
3489 batch.unwind_account_history_to(address, 75).unwrap();
3490 batch.commit().unwrap();
3491
3492 let shards = provider.account_history_shards(address).unwrap();
3494 assert_eq!(shards.len(), 2);
3495
3496 assert_eq!(shards[0].0.highest_block_number, 50);
3498 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3499
3500 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3502 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3503 }
3504
3505 #[test]
3506 fn test_batch_auto_commit_on_threshold() {
3507 let temp_dir = TempDir::new().unwrap();
3508 let provider =
3509 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3510
3511 let mut batch = RocksDBBatch {
3513 provider: &provider,
3514 inner: WriteBatchWithTransaction::<true>::default(),
3515 buf: Vec::new(),
3516 auto_commit_threshold: Some(1024), };
3518
3519 for i in 0..100u64 {
3522 let value = format!("value_{i:04}").into_bytes();
3523 batch.put::<TestTable>(i, &value).unwrap();
3524 }
3525
3526 let first_visible = provider.get::<TestTable>(0).unwrap();
3529 assert!(first_visible.is_some(), "Auto-committed data should be visible");
3530
3531 batch.commit().unwrap();
3533
3534 for i in 0..100u64 {
3536 let value = format!("value_{i:04}").into_bytes();
3537 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3538 }
3539 }
3540
3541 struct AccountPruneCase {
3545 name: &'static str,
3546 initial_shards: &'static [(u64, &'static [u64])],
3547 prune_to: u64,
3548 expected_outcome: PruneShardOutcome,
3549 expected_shards: &'static [(u64, &'static [u64])],
3550 }
3551
3552 struct StoragePruneCase {
3554 name: &'static str,
3555 initial_shards: &'static [(u64, &'static [u64])],
3556 prune_to: u64,
3557 expected_outcome: PruneShardOutcome,
3558 expected_shards: &'static [(u64, &'static [u64])],
3559 }
3560
3561 #[test]
3562 fn test_prune_account_history_cases() {
3563 const MAX: u64 = u64::MAX;
3564 const CASES: &[AccountPruneCase] = &[
3565 AccountPruneCase {
3566 name: "single_shard_truncate",
3567 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3568 prune_to: 25,
3569 expected_outcome: PruneShardOutcome::Updated,
3570 expected_shards: &[(MAX, &[30, 40])],
3571 },
3572 AccountPruneCase {
3573 name: "single_shard_delete_all",
3574 initial_shards: &[(MAX, &[10, 20])],
3575 prune_to: 20,
3576 expected_outcome: PruneShardOutcome::Deleted,
3577 expected_shards: &[],
3578 },
3579 AccountPruneCase {
3580 name: "single_shard_noop",
3581 initial_shards: &[(MAX, &[10, 20])],
3582 prune_to: 5,
3583 expected_outcome: PruneShardOutcome::Unchanged,
3584 expected_shards: &[(MAX, &[10, 20])],
3585 },
3586 AccountPruneCase {
3587 name: "no_shards",
3588 initial_shards: &[],
3589 prune_to: 100,
3590 expected_outcome: PruneShardOutcome::Unchanged,
3591 expected_shards: &[],
3592 },
3593 AccountPruneCase {
3594 name: "multi_shard_truncate_first",
3595 initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3596 prune_to: 25,
3597 expected_outcome: PruneShardOutcome::Updated,
3598 expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3599 },
3600 AccountPruneCase {
3601 name: "delete_first_shard_sentinel_unchanged",
3602 initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3603 prune_to: 20,
3604 expected_outcome: PruneShardOutcome::Deleted,
3605 expected_shards: &[(MAX, &[30, 40])],
3606 },
3607 AccountPruneCase {
3608 name: "multi_shard_delete_all_but_last",
3609 initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3610 prune_to: 22,
3611 expected_outcome: PruneShardOutcome::Deleted,
3612 expected_shards: &[(MAX, &[25, 30])],
3613 },
3614 AccountPruneCase {
3615 name: "mid_shard_preserves_key",
3616 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3617 prune_to: 25,
3618 expected_outcome: PruneShardOutcome::Updated,
3619 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3620 },
3621 AccountPruneCase {
3623 name: "equiv_delete_early_shards_keep_sentinel",
3624 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3625 prune_to: 55,
3626 expected_outcome: PruneShardOutcome::Deleted,
3627 expected_shards: &[(MAX, &[60, 70])],
3628 },
3629 AccountPruneCase {
3630 name: "equiv_sentinel_becomes_empty_with_prev",
3631 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3632 prune_to: 40,
3633 expected_outcome: PruneShardOutcome::Deleted,
3634 expected_shards: &[(MAX, &[50])],
3635 },
3636 AccountPruneCase {
3637 name: "equiv_all_shards_become_empty",
3638 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3639 prune_to: 51,
3640 expected_outcome: PruneShardOutcome::Deleted,
3641 expected_shards: &[],
3642 },
3643 AccountPruneCase {
3644 name: "equiv_non_sentinel_last_shard_promoted",
3645 initial_shards: &[(100, &[50, 75, 100])],
3646 prune_to: 60,
3647 expected_outcome: PruneShardOutcome::Updated,
3648 expected_shards: &[(MAX, &[75, 100])],
3649 },
3650 AccountPruneCase {
3651 name: "equiv_filter_within_shard",
3652 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3653 prune_to: 25,
3654 expected_outcome: PruneShardOutcome::Updated,
3655 expected_shards: &[(MAX, &[30, 40])],
3656 },
3657 AccountPruneCase {
3658 name: "equiv_multi_shard_partial_delete",
3659 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3660 prune_to: 35,
3661 expected_outcome: PruneShardOutcome::Deleted,
3662 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3663 },
3664 ];
3665
3666 let address = Address::from([0x42; 20]);
3667
3668 for case in CASES {
3669 let temp_dir = TempDir::new().unwrap();
3670 let provider =
3671 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3672
3673 let mut batch = provider.batch();
3675 for (highest, blocks) in case.initial_shards {
3676 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3677 batch
3678 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3679 .unwrap();
3680 }
3681 batch.commit().unwrap();
3682
3683 let mut batch = provider.batch();
3685 let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3686 batch.commit().unwrap();
3687
3688 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3690
3691 let shards = provider.account_history_shards(address).unwrap();
3693 assert_eq!(
3694 shards.len(),
3695 case.expected_shards.len(),
3696 "case '{}': wrong shard count",
3697 case.name
3698 );
3699 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3700 shards.iter().zip(case.expected_shards.iter()).enumerate()
3701 {
3702 assert_eq!(
3703 key.highest_block_number, *exp_key,
3704 "case '{}': shard {} wrong key",
3705 case.name, i
3706 );
3707 assert_eq!(
3708 blocks.iter().collect::<Vec<_>>(),
3709 *exp_blocks,
3710 "case '{}': shard {} wrong blocks",
3711 case.name,
3712 i
3713 );
3714 }
3715 }
3716 }
3717
3718 #[test]
3719 fn test_prune_storage_history_cases() {
3720 const MAX: u64 = u64::MAX;
3721 const CASES: &[StoragePruneCase] = &[
3722 StoragePruneCase {
3723 name: "single_shard_truncate",
3724 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3725 prune_to: 25,
3726 expected_outcome: PruneShardOutcome::Updated,
3727 expected_shards: &[(MAX, &[30, 40])],
3728 },
3729 StoragePruneCase {
3730 name: "single_shard_delete_all",
3731 initial_shards: &[(MAX, &[10, 20])],
3732 prune_to: 20,
3733 expected_outcome: PruneShardOutcome::Deleted,
3734 expected_shards: &[],
3735 },
3736 StoragePruneCase {
3737 name: "noop",
3738 initial_shards: &[(MAX, &[10, 20])],
3739 prune_to: 5,
3740 expected_outcome: PruneShardOutcome::Unchanged,
3741 expected_shards: &[(MAX, &[10, 20])],
3742 },
3743 StoragePruneCase {
3744 name: "no_shards",
3745 initial_shards: &[],
3746 prune_to: 100,
3747 expected_outcome: PruneShardOutcome::Unchanged,
3748 expected_shards: &[],
3749 },
3750 StoragePruneCase {
3751 name: "mid_shard_preserves_key",
3752 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3753 prune_to: 25,
3754 expected_outcome: PruneShardOutcome::Updated,
3755 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3756 },
3757 StoragePruneCase {
3759 name: "equiv_sentinel_promotion",
3760 initial_shards: &[(100, &[50, 75, 100])],
3761 prune_to: 60,
3762 expected_outcome: PruneShardOutcome::Updated,
3763 expected_shards: &[(MAX, &[75, 100])],
3764 },
3765 StoragePruneCase {
3766 name: "equiv_delete_early_shards_keep_sentinel",
3767 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3768 prune_to: 55,
3769 expected_outcome: PruneShardOutcome::Deleted,
3770 expected_shards: &[(MAX, &[60, 70])],
3771 },
3772 StoragePruneCase {
3773 name: "equiv_sentinel_becomes_empty_with_prev",
3774 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3775 prune_to: 40,
3776 expected_outcome: PruneShardOutcome::Deleted,
3777 expected_shards: &[(MAX, &[50])],
3778 },
3779 StoragePruneCase {
3780 name: "equiv_all_shards_become_empty",
3781 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3782 prune_to: 51,
3783 expected_outcome: PruneShardOutcome::Deleted,
3784 expected_shards: &[],
3785 },
3786 StoragePruneCase {
3787 name: "equiv_filter_within_shard",
3788 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3789 prune_to: 25,
3790 expected_outcome: PruneShardOutcome::Updated,
3791 expected_shards: &[(MAX, &[30, 40])],
3792 },
3793 StoragePruneCase {
3794 name: "equiv_multi_shard_partial_delete",
3795 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3796 prune_to: 35,
3797 expected_outcome: PruneShardOutcome::Deleted,
3798 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3799 },
3800 ];
3801
3802 let address = Address::from([0x42; 20]);
3803 let storage_key = B256::from([0x01; 32]);
3804
3805 for case in CASES {
3806 let temp_dir = TempDir::new().unwrap();
3807 let provider =
3808 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3809
3810 let mut batch = provider.batch();
3812 for (highest, blocks) in case.initial_shards {
3813 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3814 let key = if *highest == MAX {
3815 StorageShardedKey::last(address, storage_key)
3816 } else {
3817 StorageShardedKey::new(address, storage_key, *highest)
3818 };
3819 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3820 }
3821 batch.commit().unwrap();
3822
3823 let mut batch = provider.batch();
3825 let outcome =
3826 batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3827 batch.commit().unwrap();
3828
3829 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3831
3832 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3834 assert_eq!(
3835 shards.len(),
3836 case.expected_shards.len(),
3837 "case '{}': wrong shard count",
3838 case.name
3839 );
3840 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3841 shards.iter().zip(case.expected_shards.iter()).enumerate()
3842 {
3843 assert_eq!(
3844 key.sharded_key.highest_block_number, *exp_key,
3845 "case '{}': shard {} wrong key",
3846 case.name, i
3847 );
3848 assert_eq!(
3849 blocks.iter().collect::<Vec<_>>(),
3850 *exp_blocks,
3851 "case '{}': shard {} wrong blocks",
3852 case.name,
3853 i
3854 );
3855 }
3856 }
3857 }
3858
3859 #[test]
3860 fn test_prune_storage_history_does_not_affect_other_slots() {
3861 let temp_dir = TempDir::new().unwrap();
3862 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3863
3864 let address = Address::from([0x42; 20]);
3865 let slot1 = B256::from([0x01; 32]);
3866 let slot2 = B256::from([0x02; 32]);
3867
3868 let mut batch = provider.batch();
3870 batch
3871 .put::<tables::StoragesHistory>(
3872 StorageShardedKey::last(address, slot1),
3873 &BlockNumberList::new_pre_sorted([10u64, 20]),
3874 )
3875 .unwrap();
3876 batch
3877 .put::<tables::StoragesHistory>(
3878 StorageShardedKey::last(address, slot2),
3879 &BlockNumberList::new_pre_sorted([30u64, 40]),
3880 )
3881 .unwrap();
3882 batch.commit().unwrap();
3883
3884 let mut batch = provider.batch();
3886 let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
3887 batch.commit().unwrap();
3888
3889 assert_eq!(outcome, PruneShardOutcome::Deleted);
3890
3891 let shards1 = provider.storage_history_shards(address, slot1).unwrap();
3893 assert!(shards1.is_empty());
3894
3895 let shards2 = provider.storage_history_shards(address, slot2).unwrap();
3897 assert_eq!(shards2.len(), 1);
3898 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
3899 }
3900
3901 #[test]
3902 fn test_prune_invariants() {
3903 let address = Address::from([0x42; 20]);
3905 let storage_key = B256::from([0x01; 32]);
3906
3907 #[allow(clippy::type_complexity)]
3909 let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
3910 (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
3912 (&[(100, &[50, 100])], 60),
3914 ];
3915
3916 for (initial_shards, prune_to) in invariant_cases {
3917 {
3919 let temp_dir = TempDir::new().unwrap();
3920 let provider =
3921 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3922
3923 let mut batch = provider.batch();
3924 for (highest, blocks) in *initial_shards {
3925 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3926 batch
3927 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3928 .unwrap();
3929 }
3930 batch.commit().unwrap();
3931
3932 let mut batch = provider.batch();
3933 batch.prune_account_history_to(address, *prune_to).unwrap();
3934 batch.commit().unwrap();
3935
3936 let shards = provider.account_history_shards(address).unwrap();
3937
3938 for (key, blocks) in &shards {
3940 assert!(
3941 !blocks.is_empty(),
3942 "Account: empty shard at key {}",
3943 key.highest_block_number
3944 );
3945 }
3946
3947 if !shards.is_empty() {
3949 let last = shards.last().unwrap();
3950 assert_eq!(
3951 last.0.highest_block_number,
3952 u64::MAX,
3953 "Account: last shard must be sentinel"
3954 );
3955 }
3956 }
3957
3958 {
3960 let temp_dir = TempDir::new().unwrap();
3961 let provider =
3962 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3963
3964 let mut batch = provider.batch();
3965 for (highest, blocks) in *initial_shards {
3966 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3967 let key = if *highest == u64::MAX {
3968 StorageShardedKey::last(address, storage_key)
3969 } else {
3970 StorageShardedKey::new(address, storage_key, *highest)
3971 };
3972 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3973 }
3974 batch.commit().unwrap();
3975
3976 let mut batch = provider.batch();
3977 batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
3978 batch.commit().unwrap();
3979
3980 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3981
3982 for (key, blocks) in &shards {
3984 assert!(
3985 !blocks.is_empty(),
3986 "Storage: empty shard at key {}",
3987 key.sharded_key.highest_block_number
3988 );
3989 }
3990
3991 if !shards.is_empty() {
3993 let last = shards.last().unwrap();
3994 assert_eq!(
3995 last.0.sharded_key.highest_block_number,
3996 u64::MAX,
3997 "Storage: last shard must be sentinel"
3998 );
3999 }
4000 }
4001 }
4002 }
4003
4004 #[test]
4005 fn test_prune_account_history_batch_multiple_sorted_targets() {
4006 let temp_dir = TempDir::new().unwrap();
4007 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4008
4009 let addr1 = Address::from([0x01; 20]);
4010 let addr2 = Address::from([0x02; 20]);
4011 let addr3 = Address::from([0x03; 20]);
4012
4013 let mut batch = provider.batch();
4015 batch
4016 .put::<tables::AccountsHistory>(
4017 ShardedKey::new(addr1, u64::MAX),
4018 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4019 )
4020 .unwrap();
4021 batch
4022 .put::<tables::AccountsHistory>(
4023 ShardedKey::new(addr2, u64::MAX),
4024 &BlockNumberList::new_pre_sorted([5, 10, 15]),
4025 )
4026 .unwrap();
4027 batch
4028 .put::<tables::AccountsHistory>(
4029 ShardedKey::new(addr3, u64::MAX),
4030 &BlockNumberList::new_pre_sorted([100, 200]),
4031 )
4032 .unwrap();
4033 batch.commit().unwrap();
4034
4035 let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4037 targets.sort_by_key(|(addr, _)| *addr);
4038
4039 let mut batch = provider.batch();
4040 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4041 batch.commit().unwrap();
4042
4043 assert_eq!(outcomes.updated, 2);
4047 assert_eq!(outcomes.unchanged, 1);
4048
4049 let shards1 = provider.account_history_shards(addr1).unwrap();
4050 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4051
4052 let shards2 = provider.account_history_shards(addr2).unwrap();
4053 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4054
4055 let shards3 = provider.account_history_shards(addr3).unwrap();
4056 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4057 }
4058
4059 #[test]
4060 fn test_prune_account_history_batch_target_with_no_shards() {
4061 let temp_dir = TempDir::new().unwrap();
4062 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4063
4064 let addr1 = Address::from([0x01; 20]);
4065 let addr2 = Address::from([0x02; 20]); let addr3 = Address::from([0x03; 20]);
4067
4068 let mut batch = provider.batch();
4070 batch
4071 .put::<tables::AccountsHistory>(
4072 ShardedKey::new(addr1, u64::MAX),
4073 &BlockNumberList::new_pre_sorted([10, 20]),
4074 )
4075 .unwrap();
4076 batch
4077 .put::<tables::AccountsHistory>(
4078 ShardedKey::new(addr3, u64::MAX),
4079 &BlockNumberList::new_pre_sorted([30, 40]),
4080 )
4081 .unwrap();
4082 batch.commit().unwrap();
4083
4084 let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4086 targets.sort_by_key(|(addr, _)| *addr);
4087
4088 let mut batch = provider.batch();
4089 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4090 batch.commit().unwrap();
4091
4092 assert_eq!(outcomes.updated, 2);
4096 assert_eq!(outcomes.unchanged, 1);
4097
4098 let shards1 = provider.account_history_shards(addr1).unwrap();
4099 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4100
4101 let shards3 = provider.account_history_shards(addr3).unwrap();
4102 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4103 }
4104
4105 #[test]
4106 fn test_prune_storage_history_batch_multiple_sorted_targets() {
4107 let temp_dir = TempDir::new().unwrap();
4108 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4109
4110 let addr = Address::from([0x42; 20]);
4111 let slot1 = B256::from([0x01; 32]);
4112 let slot2 = B256::from([0x02; 32]);
4113
4114 let mut batch = provider.batch();
4116 batch
4117 .put::<tables::StoragesHistory>(
4118 StorageShardedKey::new(addr, slot1, u64::MAX),
4119 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4120 )
4121 .unwrap();
4122 batch
4123 .put::<tables::StoragesHistory>(
4124 StorageShardedKey::new(addr, slot2, u64::MAX),
4125 &BlockNumberList::new_pre_sorted([5, 15, 25]),
4126 )
4127 .unwrap();
4128 batch.commit().unwrap();
4129
4130 let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4132 targets.sort_by_key(|((a, s), _)| (*a, *s));
4133
4134 let mut batch = provider.batch();
4135 let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4136 batch.commit().unwrap();
4137
4138 assert_eq!(outcomes.updated, 2);
4139
4140 let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4141 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4142
4143 let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4144 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4145 }
4146}