1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5 map::{AddressMap, HashMap},
6 Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13 database_metrics::DatabaseMetrics,
14 models::{
15 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16 StorageSettings,
17 },
18 table::{Compress, Decode, Decompress, Encode, Table},
19 tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25 provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30 OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
31 WriteBatchWithTransaction, WriteBufferManager, WriteOptions, DB,
32};
33use std::{
34 collections::BTreeMap,
35 fmt,
36 path::{Path, PathBuf},
37 sync::Arc,
38};
39use tracing::instrument;
40
41fn synced_write_options() -> WriteOptions {
43 let mut opts = WriteOptions::default();
44 opts.set_sync(true);
45 opts
46}
47
48pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
50
51type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
53
54#[derive(Debug, Clone)]
56pub struct RocksDBTableStats {
57 pub sst_size_bytes: u64,
59 pub memtable_size_bytes: u64,
61 pub name: String,
63 pub estimated_num_keys: u64,
65 pub estimated_size_bytes: u64,
67 pub pending_compaction_bytes: u64,
69}
70
71#[derive(Debug, Clone)]
75pub struct RocksDBStats {
76 pub tables: Vec<RocksDBTableStats>,
78 pub wal_size_bytes: u64,
82}
83
84#[derive(Clone)]
86pub(crate) struct RocksDBWriteCtx {
87 pub first_block_number: BlockNumber,
89 pub prune_tx_lookup: Option<PruneMode>,
91 pub storage_settings: StorageSettings,
93 pub pending_batches: PendingRocksDBBatches,
95}
96
97impl fmt::Debug for RocksDBWriteCtx {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.debug_struct("RocksDBWriteCtx")
100 .field("first_block_number", &self.first_block_number)
101 .field("prune_tx_lookup", &self.prune_tx_lookup)
102 .field("storage_settings", &self.storage_settings)
103 .field("pending_batches", &"<pending batches>")
104 .finish()
105 }
106}
107
108const DEFAULT_CACHE_SIZE: usize = 128 << 20;
110
111const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
113
114const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
116
117const DEFAULT_MAX_OPEN_FILES: i32 = 512;
125
126const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
128
129const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
135
136const DEFAULT_WRITE_BUFFER_MANAGER_SIZE: usize = 4 * 1024 * 1024 * 1024;
141
142const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
146
147const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 512 * 1024 * 1024;
154
155pub struct RocksDBBuilder {
157 path: PathBuf,
158 column_families: Vec<String>,
159 enable_metrics: bool,
160 enable_statistics: bool,
161 log_level: rocksdb::LogLevel,
162 block_cache: Cache,
163 read_only: bool,
164}
165
166impl fmt::Debug for RocksDBBuilder {
167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168 f.debug_struct("RocksDBBuilder")
169 .field("path", &self.path)
170 .field("column_families", &self.column_families)
171 .field("enable_metrics", &self.enable_metrics)
172 .finish()
173 }
174}
175
176impl RocksDBBuilder {
177 pub fn new(path: impl AsRef<Path>) -> Self {
179 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
180 Self {
181 path: path.as_ref().to_path_buf(),
182 column_families: Vec::new(),
183 enable_metrics: false,
184 enable_statistics: false,
185 log_level: rocksdb::LogLevel::Info,
186 block_cache: cache,
187 read_only: false,
188 }
189 }
190
191 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
193 let mut table_options = BlockBasedOptions::default();
194 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
195 table_options.set_cache_index_and_filter_blocks(true);
196 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
197 table_options.set_block_cache(cache);
199 table_options
200 }
201
202 fn default_options(
204 log_level: rocksdb::LogLevel,
205 cache: &Cache,
206 enable_statistics: bool,
207 ) -> Options {
208 let table_options = Self::default_table_options(cache);
210
211 let mut options = Options::default();
212 options.set_block_based_table_factory(&table_options);
213 options.create_if_missing(true);
214 options.create_missing_column_families(true);
215 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
216 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
217 let write_buffer_manager =
218 WriteBufferManager::new_write_buffer_manager(DEFAULT_WRITE_BUFFER_MANAGER_SIZE, true);
219 options.set_write_buffer_manager(&write_buffer_manager);
220
221 options.set_bottommost_compression_type(DBCompressionType::Zstd);
222 options.set_bottommost_zstd_max_train_bytes(0, true);
223 options.set_compression_type(DBCompressionType::Lz4);
224 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
225
226 options.set_log_level(log_level);
227
228 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
229
230 options.set_wal_ttl_seconds(0);
233 options.set_wal_size_limit_mb(0);
234
235 if enable_statistics {
237 options.enable_statistics();
238 }
239
240 options
241 }
242
243 fn default_column_family_options(cache: &Cache) -> Options {
245 let table_options = Self::default_table_options(cache);
247
248 let mut cf_options = Options::default();
249 cf_options.set_block_based_table_factory(&table_options);
250 cf_options.set_level_compaction_dynamic_level_bytes(true);
251 cf_options.set_compression_type(DBCompressionType::Lz4);
253 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
254 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
256 cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
257
258 cf_options
259 }
260
261 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
268 let mut table_options = BlockBasedOptions::default();
269 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
270 table_options.set_cache_index_and_filter_blocks(true);
271 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
272 table_options.set_block_cache(cache);
273 let mut cf_options = Options::default();
277 cf_options.set_block_based_table_factory(&table_options);
278 cf_options.set_level_compaction_dynamic_level_bytes(true);
279 cf_options.set_compression_type(DBCompressionType::None);
282 cf_options.set_bottommost_compression_type(DBCompressionType::None);
283
284 cf_options
285 }
286
287 pub fn with_table<T: Table>(mut self) -> Self {
289 self.column_families.push(T::NAME.to_string());
290 self
291 }
292
293 pub fn with_default_tables(self) -> Self {
300 self.with_table::<tables::TransactionHashNumbers>()
301 .with_table::<tables::AccountsHistory>()
302 .with_table::<tables::StoragesHistory>()
303 }
304
305 pub const fn with_metrics(mut self) -> Self {
307 self.enable_metrics = true;
308 self
309 }
310
311 pub const fn with_statistics(mut self) -> Self {
313 self.enable_statistics = true;
314 self
315 }
316
317 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
319 if let Some(level) = log_level {
320 self.log_level = convert_log_level(level);
321 }
322 self
323 }
324
325 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
327 self.block_cache = Cache::new_lru_cache(capacity_bytes);
328 self
329 }
330
331 pub const fn with_read_only(mut self, read_only: bool) -> Self {
339 self.read_only = read_only;
340 self
341 }
342
343 pub fn build(self) -> ProviderResult<RocksDBProvider> {
345 let options =
346 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
347
348 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
349 .column_families
350 .iter()
351 .map(|name| {
352 let cf_options = if name == tables::TransactionHashNumbers::NAME {
353 Self::tx_hash_numbers_column_family_options(&self.block_cache)
354 } else {
355 Self::default_column_family_options(&self.block_cache)
356 };
357 ColumnFamilyDescriptor::new(name.clone(), cf_options)
358 })
359 .collect();
360
361 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
362
363 if self.read_only {
364 let mut options = options;
367 options.set_max_open_files(-1);
368
369 let secondary_path = self
370 .path
371 .parent()
372 .unwrap_or(&self.path)
373 .join(format!("rocksdb-secondary-tmp-{}", std::process::id()));
374 reth_fs_util::create_dir_all(&secondary_path).map_err(ProviderError::other)?;
375
376 let db = DB::open_cf_descriptors_as_secondary(
377 &options,
378 &self.path,
379 &secondary_path,
380 cf_descriptors,
381 )
382 .map_err(|e| {
383 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
384 message: e.to_string().into(),
385 code: -1,
386 }))
387 })?;
388 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::Secondary {
389 db,
390 metrics,
391 secondary_path,
392 })))
393 } else {
394 let db =
399 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
400 .map_err(|e| {
401 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
402 message: e.to_string().into(),
403 code: -1,
404 }))
405 })?;
406 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
407 }
408 }
409}
410
411macro_rules! compress_to_buf_or_ref {
414 ($buf:expr, $value:expr) => {
415 if let Some(value) = $value.uncompressable_ref() {
416 Some(value)
417 } else {
418 $buf.clear();
419 $value.compress_to_buf(&mut $buf);
420 None
421 }
422 };
423}
424
425#[derive(Debug)]
427pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
428
429enum RocksDBProviderInner {
431 ReadWrite {
433 db: OptimisticTransactionDB,
435 metrics: Option<RocksDBMetrics>,
437 },
438 Secondary {
442 db: DB,
444 metrics: Option<RocksDBMetrics>,
446 secondary_path: PathBuf,
448 },
449}
450
451impl RocksDBProviderInner {
452 const fn metrics(&self) -> Option<&RocksDBMetrics> {
454 match self {
455 Self::ReadWrite { metrics, .. } | Self::Secondary { metrics, .. } => metrics.as_ref(),
456 }
457 }
458
459 fn db_rw(&self) -> &OptimisticTransactionDB {
461 match self {
462 Self::ReadWrite { db, .. } => db,
463 Self::Secondary { .. } => {
464 panic!("Cannot perform write operation on secondary RocksDB provider")
465 }
466 }
467 }
468
469 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
471 let cf = match self {
472 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
473 Self::Secondary { db, .. } => db.cf_handle(T::NAME),
474 };
475 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
476 }
477
478 fn get_cf(
480 &self,
481 cf: &rocksdb::ColumnFamily,
482 key: impl AsRef<[u8]>,
483 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
484 match self {
485 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
486 Self::Secondary { db, .. } => db.get_cf(cf, key),
487 }
488 }
489
490 fn put_cf(
492 &self,
493 cf: &rocksdb::ColumnFamily,
494 key: impl AsRef<[u8]>,
495 value: impl AsRef<[u8]>,
496 ) -> Result<(), rocksdb::Error> {
497 self.db_rw().put_cf(cf, key, value)
498 }
499
500 fn delete_cf(
502 &self,
503 cf: &rocksdb::ColumnFamily,
504 key: impl AsRef<[u8]>,
505 ) -> Result<(), rocksdb::Error> {
506 self.db_rw().delete_cf(cf, key)
507 }
508
509 fn delete_range_cf<K: AsRef<[u8]>>(
511 &self,
512 cf: &rocksdb::ColumnFamily,
513 from: K,
514 to: K,
515 ) -> Result<(), rocksdb::Error> {
516 self.db_rw().delete_range_cf(cf, from, to)
517 }
518
519 fn iterator_cf(
521 &self,
522 cf: &rocksdb::ColumnFamily,
523 mode: IteratorMode<'_>,
524 ) -> RocksDBIterEnum<'_> {
525 match self {
526 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
527 Self::Secondary { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
528 }
529 }
530
531 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
536 match self {
537 Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
538 Self::Secondary { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
539 }
540 }
541
542 fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
544 match self {
545 Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
546 Self::Secondary { db, .. } => RocksReadSnapshotInner::Secondary(db),
547 }
548 }
549
550 fn path(&self) -> &Path {
552 match self {
553 Self::ReadWrite { db, .. } => db.path(),
554 Self::Secondary { db, .. } => db.path(),
555 }
556 }
557
558 fn wal_size_bytes(&self) -> u64 {
562 let path = self.path();
563
564 match std::fs::read_dir(path) {
565 Ok(entries) => entries
566 .filter_map(|e| e.ok())
567 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
568 .filter_map(|e| e.metadata().ok())
569 .map(|m| m.len())
570 .sum(),
571 Err(_) => 0,
572 }
573 }
574
575 fn table_stats(&self) -> Vec<RocksDBTableStats> {
577 let mut stats = Vec::new();
578
579 macro_rules! collect_stats {
580 ($db:expr) => {
581 for cf_name in ROCKSDB_TABLES {
582 if let Some(cf) = $db.cf_handle(cf_name) {
583 let estimated_num_keys = $db
584 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
585 .ok()
586 .flatten()
587 .unwrap_or(0);
588
589 let sst_size = $db
591 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
592 .ok()
593 .flatten()
594 .unwrap_or(0);
595
596 let memtable_size = $db
597 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
598 .ok()
599 .flatten()
600 .unwrap_or(0);
601
602 let estimated_size_bytes = sst_size + memtable_size;
603
604 let pending_compaction_bytes = $db
605 .property_int_value_cf(
606 cf,
607 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
608 )
609 .ok()
610 .flatten()
611 .unwrap_or(0);
612
613 stats.push(RocksDBTableStats {
614 sst_size_bytes: sst_size,
615 memtable_size_bytes: memtable_size,
616 name: cf_name.to_string(),
617 estimated_num_keys,
618 estimated_size_bytes,
619 pending_compaction_bytes,
620 });
621 }
622 }
623 };
624 }
625
626 match self {
627 Self::ReadWrite { db, .. } => collect_stats!(db),
628 Self::Secondary { db, .. } => collect_stats!(db),
629 }
630
631 stats
632 }
633
634 fn db_stats(&self) -> RocksDBStats {
636 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
637 }
638}
639
640impl fmt::Debug for RocksDBProviderInner {
641 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
642 match self {
643 Self::ReadWrite { metrics, .. } => f
644 .debug_struct("RocksDBProviderInner::ReadWrite")
645 .field("db", &"<OptimisticTransactionDB>")
646 .field("metrics", metrics)
647 .finish(),
648 Self::Secondary { metrics, .. } => f
649 .debug_struct("RocksDBProviderInner::Secondary")
650 .field("db", &"<DB (secondary)>")
651 .field("metrics", metrics)
652 .finish(),
653 }
654 }
655}
656
657impl Drop for RocksDBProviderInner {
658 fn drop(&mut self) {
659 match self {
660 Self::ReadWrite { db, .. } => {
661 if let Err(e) = db.flush_wal(true) {
664 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
665 }
666 for cf_name in ROCKSDB_TABLES {
667 if let Some(cf) = db.cf_handle(cf_name) &&
668 let Err(e) = db.flush_cf(&cf)
669 {
670 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
671 }
672 }
673 db.cancel_all_background_work(true);
674 }
675 Self::Secondary { db, secondary_path, .. } => {
676 db.cancel_all_background_work(true);
677 let _ = std::fs::remove_dir_all(secondary_path);
678 }
679 }
680 }
681}
682
683impl Clone for RocksDBProvider {
684 fn clone(&self) -> Self {
685 Self(self.0.clone())
686 }
687}
688
689impl DatabaseMetrics for RocksDBProvider {
690 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
691 let mut metrics = Vec::new();
692
693 for stat in self.table_stats() {
694 metrics.push((
695 "rocksdb.table_size",
696 stat.estimated_size_bytes as f64,
697 vec![Label::new("table", stat.name.clone())],
698 ));
699 metrics.push((
700 "rocksdb.table_entries",
701 stat.estimated_num_keys as f64,
702 vec![Label::new("table", stat.name.clone())],
703 ));
704 metrics.push((
705 "rocksdb.pending_compaction_bytes",
706 stat.pending_compaction_bytes as f64,
707 vec![Label::new("table", stat.name.clone())],
708 ));
709 metrics.push((
710 "rocksdb.sst_size",
711 stat.sst_size_bytes as f64,
712 vec![Label::new("table", stat.name.clone())],
713 ));
714 metrics.push((
715 "rocksdb.memtable_size",
716 stat.memtable_size_bytes as f64,
717 vec![Label::new("table", stat.name)],
718 ));
719 }
720
721 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
723
724 metrics
725 }
726}
727
728impl RocksDBProvider {
729 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
731 RocksDBBuilder::new(path).build()
732 }
733
734 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
736 RocksDBBuilder::new(path)
737 }
738
739 pub fn exists(path: impl AsRef<Path>) -> bool {
744 path.as_ref().join("CURRENT").exists()
745 }
746
747 pub fn is_read_only(&self) -> bool {
749 matches!(self.0.as_ref(), RocksDBProviderInner::Secondary { .. })
750 }
751
752 pub fn try_catch_up_with_primary(&self) -> ProviderResult<()> {
757 match self.0.as_ref() {
758 RocksDBProviderInner::Secondary { db, .. } => {
759 db.try_catch_up_with_primary().map_err(|e| {
760 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
761 message: e.to_string().into(),
762 code: -1,
763 }))
764 })
765 }
766 _ => Ok(()),
767 }
768 }
769
770 pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
774 RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
775 }
776
777 pub fn tx(&self) -> RocksTx<'_> {
785 let write_options = synced_write_options();
786 let txn_options = OptimisticTransactionOptions::default();
787 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
788 RocksTx { inner, provider: self }
789 }
790
791 pub fn batch(&self) -> RocksDBBatch<'_> {
799 RocksDBBatch {
800 provider: self,
801 inner: WriteBatchWithTransaction::<true>::default(),
802 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
803 auto_commit_threshold: None,
804 }
805 }
806
807 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
813 RocksDBBatch {
814 provider: self,
815 inner: WriteBatchWithTransaction::<true>::default(),
816 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
817 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
818 }
819 }
820
821 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
823 self.0.cf_handle::<T>()
824 }
825
826 fn execute_with_operation_metric<R>(
828 &self,
829 operation: RocksDBOperation,
830 table: &'static str,
831 f: impl FnOnce(&Self) -> R,
832 ) -> R {
833 let start = self.0.metrics().map(|_| Instant::now());
834 let res = f(self);
835
836 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
837 metrics.record_operation(operation, table, start.elapsed());
838 }
839
840 res
841 }
842
843 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
845 self.get_encoded::<T>(&key.encode())
846 }
847
848 pub fn get_encoded<T: Table>(
850 &self,
851 key: &<T::Key as Encode>::Encoded,
852 ) -> ProviderResult<Option<T::Value>> {
853 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
854 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
855 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
856 message: e.to_string().into(),
857 code: -1,
858 }))
859 })?;
860
861 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
862 })
863 }
864
865 pub fn get_raw<T: Table>(&self, key: T::Key) -> ProviderResult<Option<Vec<u8>>> {
867 let encoded = key.encode();
868 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
869 this.0.get_cf(this.get_cf_handle::<T>()?, encoded.as_ref()).map_err(|e| {
870 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
871 message: e.to_string().into(),
872 code: -1,
873 }))
874 })
875 })
876 }
877
878 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
883 let encoded_key = key.encode();
884 self.put_encoded::<T>(&encoded_key, value)
885 }
886
887 pub fn put_encoded<T: Table>(
892 &self,
893 key: &<T::Key as Encode>::Encoded,
894 value: &T::Value,
895 ) -> ProviderResult<()> {
896 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
897 let mut buf = Vec::new();
901 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
902
903 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
904 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
905 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
906 operation: DatabaseWriteOperation::PutUpsert,
907 table_name: T::NAME,
908 key: key.as_ref().to_vec(),
909 })))
910 })
911 })
912 }
913
914 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
919 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
920 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
921 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
922 message: e.to_string().into(),
923 code: -1,
924 }))
925 })
926 })
927 }
928
929 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
935 let cf = self.get_cf_handle::<T>()?;
936
937 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
938 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
939 message: e.to_string().into(),
940 code: -1,
941 }))
942 })?;
943
944 Ok(())
945 }
946
947 fn get_boundary<T: Table>(
949 &self,
950 mode: IteratorMode<'_>,
951 ) -> ProviderResult<Option<(T::Key, T::Value)>> {
952 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
953 let cf = this.get_cf_handle::<T>()?;
954 let mut iter = this.0.iterator_cf(cf, mode);
955
956 match iter.next() {
957 Some(Ok((key_bytes, value_bytes))) => {
958 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
959 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
960 let value = T::Value::decompress(&value_bytes)
961 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
962 Ok(Some((key, value)))
963 }
964 Some(Err(e)) => {
965 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
966 message: e.to_string().into(),
967 code: -1,
968 })))
969 }
970 None => Ok(None),
971 }
972 })
973 }
974
975 #[inline]
977 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
978 self.get_boundary::<T>(IteratorMode::Start)
979 }
980
981 #[inline]
983 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
984 self.get_boundary::<T>(IteratorMode::End)
985 }
986
987 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
991 let cf = self.get_cf_handle::<T>()?;
992 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
993 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
994 }
995
996 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksDBIter<'_, T>> {
1000 let cf = self.get_cf_handle::<T>()?;
1001 let encoded_key = key.encode();
1002 let iter = self
1003 .0
1004 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
1005 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
1006 }
1007
1008 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
1012 self.0.table_stats()
1013 }
1014
1015 pub fn wal_size_bytes(&self) -> u64 {
1021 self.0.wal_size_bytes()
1022 }
1023
1024 pub fn db_stats(&self) -> RocksDBStats {
1028 self.0.db_stats()
1029 }
1030
1031 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
1042 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
1043 let db = self.0.db_rw();
1044
1045 for cf_name in tables {
1046 if let Some(cf) = db.cf_handle(cf_name) {
1047 db.flush_cf(&cf).map_err(|e| {
1048 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1049 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1050 operation: DatabaseWriteOperation::Flush,
1051 table_name: cf_name,
1052 key: Vec::new(),
1053 })))
1054 })?;
1055 }
1056 }
1057
1058 db.flush_wal(true).map_err(|e| {
1059 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1060 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1061 operation: DatabaseWriteOperation::Flush,
1062 table_name: "WAL",
1063 key: Vec::new(),
1064 })))
1065 })?;
1066
1067 Ok(())
1068 }
1069
1070 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1082 pub fn flush_and_compact(&self) -> ProviderResult<()> {
1083 self.flush(ROCKSDB_TABLES)?;
1084
1085 let db = self.0.db_rw();
1086
1087 for cf_name in ROCKSDB_TABLES {
1088 if let Some(cf) = db.cf_handle(cf_name) {
1089 db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
1090 }
1091 }
1092
1093 Ok(())
1094 }
1095
1096 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1100 let cf = self.get_cf_handle::<T>()?;
1101 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1102 Ok(RocksDBRawIter { inner: iter })
1103 }
1104
1105 pub fn account_history_shards(
1110 &self,
1111 address: Address,
1112 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1113 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1115
1116 let start_key = ShardedKey::new(address, 0u64);
1119 let start_bytes = start_key.encode();
1120
1121 let iter = self
1123 .0
1124 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1125
1126 let mut result = Vec::new();
1127 for item in iter {
1128 match item {
1129 Ok((key_bytes, value_bytes)) => {
1130 let key = ShardedKey::<Address>::decode(&key_bytes)
1132 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1133
1134 if key.key != address {
1136 break;
1137 }
1138
1139 let value = BlockNumberList::decompress(&value_bytes)
1141 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1142
1143 result.push((key, value));
1144 }
1145 Err(e) => {
1146 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1147 message: e.to_string().into(),
1148 code: -1,
1149 })));
1150 }
1151 }
1152 }
1153
1154 Ok(result)
1155 }
1156
1157 pub fn storage_history_shards(
1162 &self,
1163 address: Address,
1164 storage_key: B256,
1165 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1166 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1167
1168 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1169 let start_bytes = start_key.encode();
1170
1171 let iter = self
1172 .0
1173 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1174
1175 let mut result = Vec::new();
1176 for item in iter {
1177 match item {
1178 Ok((key_bytes, value_bytes)) => {
1179 let key = StorageShardedKey::decode(&key_bytes)
1180 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1181
1182 if key.address != address || key.sharded_key.key != storage_key {
1183 break;
1184 }
1185
1186 let value = BlockNumberList::decompress(&value_bytes)
1187 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1188
1189 result.push((key, value));
1190 }
1191 Err(e) => {
1192 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1193 message: e.to_string().into(),
1194 code: -1,
1195 })));
1196 }
1197 }
1198 }
1199
1200 Ok(result)
1201 }
1202
1203 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1211 pub fn unwind_account_history_indices(
1212 &self,
1213 last_indices: &[(Address, BlockNumber)],
1214 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1215 let mut address_min_block: AddressMap<BlockNumber> =
1216 AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1217 for &(address, block_number) in last_indices {
1218 address_min_block
1219 .entry(address)
1220 .and_modify(|min| *min = (*min).min(block_number))
1221 .or_insert(block_number);
1222 }
1223
1224 let mut batch = self.batch();
1225 for (address, min_block) in address_min_block {
1226 match min_block.checked_sub(1) {
1227 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1228 None => batch.clear_account_history(address)?,
1229 }
1230 }
1231
1232 Ok(batch.into_inner())
1233 }
1234
1235 pub fn unwind_storage_history_indices(
1243 &self,
1244 storage_changesets: &[(Address, B256, BlockNumber)],
1245 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1246 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1247 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1248 for &(address, storage_key, block_number) in storage_changesets {
1249 key_min_block
1250 .entry((address, storage_key))
1251 .and_modify(|min| *min = (*min).min(block_number))
1252 .or_insert(block_number);
1253 }
1254
1255 let mut batch = self.batch();
1256 for ((address, storage_key), min_block) in key_min_block {
1257 match min_block.checked_sub(1) {
1258 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1259 None => batch.clear_storage_history(address, storage_key)?,
1260 }
1261 }
1262
1263 Ok(batch.into_inner())
1264 }
1265
1266 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1268 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1269 where
1270 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1271 {
1272 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1273 let mut batch_handle = this.batch();
1274 f(&mut batch_handle)?;
1275 batch_handle.commit()
1276 })
1277 }
1278
1279 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1287 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1288 self.0.db_rw().write_opt(batch, &synced_write_options()).map_err(|e| {
1289 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1290 message: e.to_string().into(),
1291 code: -1,
1292 }))
1293 })
1294 }
1295
1296 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1302 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1303 &self,
1304 blocks: &[ExecutedBlock<N>],
1305 tx_nums: &[TxNumber],
1306 ctx: RocksDBWriteCtx,
1307 runtime: &reth_tasks::Runtime,
1308 ) -> ProviderResult<()> {
1309 if !ctx.storage_settings.storage_v2 {
1310 return Ok(());
1311 }
1312
1313 let mut r_tx_hash = None;
1314 let mut r_account_history = None;
1315 let mut r_storage_history = None;
1316
1317 let write_tx_hash =
1318 ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1319 let write_account_history = ctx.storage_settings.storage_v2;
1320 let write_storage_history = ctx.storage_settings.storage_v2;
1321
1322 let span = tracing::Span::current();
1325 runtime.storage_pool().in_place_scope(|s| {
1326 if write_tx_hash {
1327 s.spawn(|_| {
1328 let _guard = span.enter();
1329 r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1330 });
1331 }
1332
1333 if write_account_history {
1334 s.spawn(|_| {
1335 let _guard = span.enter();
1336 r_account_history = Some(self.write_account_history(blocks, &ctx));
1337 });
1338 }
1339
1340 if write_storage_history {
1341 s.spawn(|_| {
1342 let _guard = span.enter();
1343 r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1344 });
1345 }
1346 });
1347
1348 if write_tx_hash {
1349 r_tx_hash.ok_or_else(|| {
1350 ProviderError::Database(DatabaseError::Other(
1351 "rocksdb tx-hash write thread panicked".into(),
1352 ))
1353 })??;
1354 }
1355 if write_account_history {
1356 r_account_history.ok_or_else(|| {
1357 ProviderError::Database(DatabaseError::Other(
1358 "rocksdb account-history write thread panicked".into(),
1359 ))
1360 })??;
1361 }
1362 if write_storage_history {
1363 r_storage_history.ok_or_else(|| {
1364 ProviderError::Database(DatabaseError::Other(
1365 "rocksdb storage-history write thread panicked".into(),
1366 ))
1367 })??;
1368 }
1369
1370 Ok(())
1371 }
1372
1373 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1375 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1376 &self,
1377 blocks: &[ExecutedBlock<N>],
1378 tx_nums: &[TxNumber],
1379 ctx: &RocksDBWriteCtx,
1380 ) -> ProviderResult<()> {
1381 let mut batch = self.batch();
1382 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1383 let body = block.recovered_block().body();
1384 for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1385 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1386 }
1387 }
1388 ctx.pending_batches.lock().push(batch.into_inner());
1389 Ok(())
1390 }
1391
1392 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1396 fn write_account_history<N: reth_node_types::NodePrimitives>(
1397 &self,
1398 blocks: &[ExecutedBlock<N>],
1399 ctx: &RocksDBWriteCtx,
1400 ) -> ProviderResult<()> {
1401 let mut batch = self.batch();
1402 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1403
1404 for (block_idx, block) in blocks.iter().enumerate() {
1405 let block_number = ctx.first_block_number + block_idx as u64;
1406 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1407
1408 for account_block_reverts in reverts.accounts {
1411 for (address, _) in account_block_reverts {
1412 account_history.entry(address).or_default().push(block_number);
1413 }
1414 }
1415 }
1416
1417 for (address, indices) in account_history {
1419 batch.append_account_history_shard(address, indices)?;
1420 }
1421 ctx.pending_batches.lock().push(batch.into_inner());
1422 Ok(())
1423 }
1424
1425 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1429 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1430 &self,
1431 blocks: &[ExecutedBlock<N>],
1432 ctx: &RocksDBWriteCtx,
1433 ) -> ProviderResult<()> {
1434 let mut batch = self.batch();
1435 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1436
1437 for (block_idx, block) in blocks.iter().enumerate() {
1438 let block_number = ctx.first_block_number + block_idx as u64;
1439 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1440
1441 for storage_block_reverts in reverts.storage {
1444 for revert in storage_block_reverts {
1445 for (slot, _) in revert.storage_revert {
1446 let plain_key = B256::new(slot.to_be_bytes());
1447 storage_history
1448 .entry((revert.address, plain_key))
1449 .or_default()
1450 .push(block_number);
1451 }
1452 }
1453 }
1454 }
1455
1456 for ((address, slot), indices) in storage_history {
1458 batch.append_storage_history_shard(address, slot, indices)?;
1459 }
1460 ctx.pending_batches.lock().push(batch.into_inner());
1461 Ok(())
1462 }
1463}
1464
1465pub struct RocksReadSnapshot<'db> {
1473 inner: RocksReadSnapshotInner<'db>,
1474 provider: &'db RocksDBProvider,
1475}
1476
1477enum RocksReadSnapshotInner<'db> {
1479 ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1481 Secondary(&'db DB),
1483}
1484
1485impl<'db> RocksReadSnapshotInner<'db> {
1486 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1488 match self {
1489 Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1490 Self::Secondary(db) => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
1491 }
1492 }
1493}
1494
1495impl fmt::Debug for RocksReadSnapshot<'_> {
1496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1497 f.debug_struct("RocksReadSnapshot")
1498 .field("provider", &self.provider)
1499 .finish_non_exhaustive()
1500 }
1501}
1502
1503impl<'db> RocksReadSnapshot<'db> {
1504 fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1506 self.provider.get_cf_handle::<T>()
1507 }
1508
1509 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1511 let encoded_key = key.encode();
1512 let cf = self.cf_handle::<T>()?;
1513 let result = match &self.inner {
1514 RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1515 RocksReadSnapshotInner::Secondary(db) => db.get_cf(cf, encoded_key.as_ref()),
1516 }
1517 .map_err(|e| {
1518 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1519 message: e.to_string().into(),
1520 code: -1,
1521 }))
1522 })?;
1523
1524 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1525 }
1526
1527 pub fn account_history_info(
1532 &self,
1533 address: Address,
1534 block_number: BlockNumber,
1535 lowest_available_block_number: Option<BlockNumber>,
1536 visible_tip: BlockNumber,
1537 ) -> ProviderResult<HistoryInfo> {
1538 let key = ShardedKey::new(address, block_number);
1539 self.history_info::<tables::AccountsHistory>(
1540 key.encode().as_ref(),
1541 block_number,
1542 lowest_available_block_number,
1543 visible_tip,
1544 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1545 |prev_bytes| {
1546 <ShardedKey<Address> as Decode>::decode(prev_bytes)
1547 .map(|k| k.key == address)
1548 .unwrap_or(false)
1549 },
1550 )
1551 }
1552
1553 pub fn storage_history_info(
1558 &self,
1559 address: Address,
1560 storage_key: B256,
1561 block_number: BlockNumber,
1562 lowest_available_block_number: Option<BlockNumber>,
1563 visible_tip: BlockNumber,
1564 ) -> ProviderResult<HistoryInfo> {
1565 let key = StorageShardedKey::new(address, storage_key, block_number);
1566 self.history_info::<tables::StoragesHistory>(
1567 key.encode().as_ref(),
1568 block_number,
1569 lowest_available_block_number,
1570 visible_tip,
1571 |key_bytes| {
1572 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1573 Ok(k.address == address && k.sharded_key.key == storage_key)
1574 },
1575 |prev_bytes| {
1576 <StorageShardedKey as Decode>::decode(prev_bytes)
1577 .map(|k| k.address == address && k.sharded_key.key == storage_key)
1578 .unwrap_or(false)
1579 },
1580 )
1581 }
1582
1583 fn history_info<T>(
1589 &self,
1590 encoded_key: &[u8],
1591 block_number: BlockNumber,
1592 lowest_available_block_number: Option<BlockNumber>,
1593 visible_tip: BlockNumber,
1594 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1595 prev_key_matches: impl Fn(&[u8]) -> bool,
1596 ) -> ProviderResult<HistoryInfo>
1597 where
1598 T: Table<Value = BlockNumberList>,
1599 {
1600 let is_maybe_pruned = lowest_available_block_number.is_some();
1601 let fallback = || {
1602 Ok(if is_maybe_pruned {
1603 HistoryInfo::MaybeInPlainState
1604 } else {
1605 HistoryInfo::NotYetWritten
1606 })
1607 };
1608
1609 let cf = self.cf_handle::<T>()?;
1610 let mut iter = self.inner.raw_iterator_cf(cf);
1611
1612 iter.seek(encoded_key);
1613 iter.status().map_err(|e| {
1614 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1615 message: e.to_string().into(),
1616 code: -1,
1617 }))
1618 })?;
1619
1620 if !iter.valid() {
1621 return fallback();
1622 }
1623
1624 let Some(key_bytes) = iter.key() else {
1625 return fallback();
1626 };
1627 if !key_matches(key_bytes)? {
1628 return fallback();
1629 }
1630
1631 let Some(value_bytes) = iter.value() else {
1632 return fallback();
1633 };
1634 let chunk = BlockNumberList::decompress(value_bytes)?;
1635
1636 let (rank, found_block) = compute_history_rank(&chunk, block_number);
1637 let found_block = found_block.filter(|block| *block <= visible_tip);
1639
1640 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1641 iter.prev();
1642 iter.status().map_err(|e| {
1643 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1644 message: e.to_string().into(),
1645 code: -1,
1646 }))
1647 })?;
1648 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1649
1650 if found_block.is_none() && !has_prev {
1654 return fallback()
1655 }
1656
1657 !has_prev
1658 } else {
1659 false
1660 };
1661
1662 Ok(HistoryInfo::from_lookup(
1663 found_block,
1664 is_before_first_write,
1665 lowest_available_block_number,
1666 ))
1667 }
1668}
1669
1670#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1672pub enum PruneShardOutcome {
1673 Deleted,
1675 Updated,
1677 Unchanged,
1679}
1680
1681#[derive(Debug, Default, Clone, Copy)]
1683pub struct PrunedIndices {
1684 pub deleted: usize,
1686 pub updated: usize,
1688 pub unchanged: usize,
1690}
1691
1692#[must_use = "batch must be committed"]
1702pub struct RocksDBBatch<'a> {
1703 provider: &'a RocksDBProvider,
1704 inner: WriteBatchWithTransaction<true>,
1705 buf: Vec<u8>,
1706 auto_commit_threshold: Option<usize>,
1708}
1709
1710impl fmt::Debug for RocksDBBatch<'_> {
1711 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1712 f.debug_struct("RocksDBBatch")
1713 .field("provider", &self.provider)
1714 .field("batch", &"<WriteBatchWithTransaction>")
1715 .field("length", &self.inner.len())
1717 .field("size_in_bytes", &self.inner.size_in_bytes())
1720 .finish()
1721 }
1722}
1723
1724impl<'a> RocksDBBatch<'a> {
1725 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1729 let encoded_key = key.encode();
1730 self.put_encoded::<T>(&encoded_key, value)
1731 }
1732
1733 pub fn put_encoded<T: Table>(
1737 &mut self,
1738 key: &<T::Key as Encode>::Encoded,
1739 value: &T::Value,
1740 ) -> ProviderResult<()> {
1741 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1742 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1743 self.maybe_auto_commit()?;
1744 Ok(())
1745 }
1746
1747 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1751 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1752 self.maybe_auto_commit()?;
1753 Ok(())
1754 }
1755
1756 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1761 if let Some(threshold) = self.auto_commit_threshold &&
1762 self.inner.size_in_bytes() >= threshold
1763 {
1764 tracing::debug!(
1765 target: "providers::rocksdb",
1766 batch_size = self.inner.size_in_bytes(),
1767 threshold,
1768 "Auto-committing RocksDB batch"
1769 );
1770 let old_batch = std::mem::take(&mut self.inner);
1771 self.provider.0.db_rw().write_opt(old_batch, &synced_write_options()).map_err(|e| {
1772 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1773 message: e.to_string().into(),
1774 code: -1,
1775 }))
1776 })?;
1777 }
1778 Ok(())
1779 }
1780
1781 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1788 pub fn commit(self) -> ProviderResult<()> {
1789 self.provider.0.db_rw().write_opt(self.inner, &synced_write_options()).map_err(|e| {
1790 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1791 message: e.to_string().into(),
1792 code: -1,
1793 }))
1794 })
1795 }
1796
1797 pub fn len(&self) -> usize {
1799 self.inner.len()
1800 }
1801
1802 pub fn is_empty(&self) -> bool {
1804 self.inner.is_empty()
1805 }
1806
1807 pub fn size_in_bytes(&self) -> usize {
1809 self.inner.size_in_bytes()
1810 }
1811
1812 pub const fn provider(&self) -> &RocksDBProvider {
1814 self.provider
1815 }
1816
1817 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1821 self.inner
1822 }
1823
1824 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1829 self.provider.get::<T>(key)
1830 }
1831
1832 pub fn append_account_history_shard(
1844 &mut self,
1845 address: Address,
1846 indices: impl IntoIterator<Item = u64>,
1847 ) -> ProviderResult<()> {
1848 let indices: Vec<u64> = indices.into_iter().collect();
1849
1850 if indices.is_empty() {
1851 return Ok(());
1852 }
1853
1854 debug_assert!(
1855 indices.windows(2).all(|w| w[0] < w[1]),
1856 "indices must be strictly increasing: {:?}",
1857 indices
1858 );
1859
1860 let last_key = ShardedKey::new(address, u64::MAX);
1861 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1862 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1863
1864 last_shard.append(indices).map_err(ProviderError::other)?;
1865
1866 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1868 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1869 return Ok(());
1870 }
1871
1872 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1874 let mut chunks_peekable = chunks.into_iter().peekable();
1875
1876 while let Some(chunk) = chunks_peekable.next() {
1877 let shard = BlockNumberList::new_pre_sorted(chunk);
1878 let highest_block_number = if chunks_peekable.peek().is_some() {
1879 shard.iter().next_back().expect("`chunks` does not return empty list")
1880 } else {
1881 u64::MAX
1882 };
1883
1884 self.put::<tables::AccountsHistory>(
1885 ShardedKey::new(address, highest_block_number),
1886 &shard,
1887 )?;
1888 }
1889
1890 Ok(())
1891 }
1892
1893 pub fn append_storage_history_shard(
1905 &mut self,
1906 address: Address,
1907 storage_key: B256,
1908 indices: impl IntoIterator<Item = u64>,
1909 ) -> ProviderResult<()> {
1910 let indices: Vec<u64> = indices.into_iter().collect();
1911
1912 if indices.is_empty() {
1913 return Ok(());
1914 }
1915
1916 debug_assert!(
1917 indices.windows(2).all(|w| w[0] < w[1]),
1918 "indices must be strictly increasing: {:?}",
1919 indices
1920 );
1921
1922 let last_key = StorageShardedKey::last(address, storage_key);
1923 let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1924 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1925
1926 last_shard.append(indices).map_err(ProviderError::other)?;
1927
1928 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1930 self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1931 return Ok(());
1932 }
1933
1934 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1936 let mut chunks_peekable = chunks.into_iter().peekable();
1937
1938 while let Some(chunk) = chunks_peekable.next() {
1939 let shard = BlockNumberList::new_pre_sorted(chunk);
1940 let highest_block_number = if chunks_peekable.peek().is_some() {
1941 shard.iter().next_back().expect("`chunks` does not return empty list")
1942 } else {
1943 u64::MAX
1944 };
1945
1946 self.put::<tables::StoragesHistory>(
1947 StorageShardedKey::new(address, storage_key, highest_block_number),
1948 &shard,
1949 )?;
1950 }
1951
1952 Ok(())
1953 }
1954
1955 pub fn unwind_account_history_to(
1962 &mut self,
1963 address: Address,
1964 keep_to: BlockNumber,
1965 ) -> ProviderResult<()> {
1966 let shards = self.provider.account_history_shards(address)?;
1967 if shards.is_empty() {
1968 return Ok(());
1969 }
1970
1971 let boundary_idx = shards.iter().position(|(key, _)| {
1974 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1975 });
1976
1977 let Some(boundary_idx) = boundary_idx else {
1979 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1980 if last_key.highest_block_number != u64::MAX {
1981 self.delete::<tables::AccountsHistory>(last_key.clone())?;
1982 self.put::<tables::AccountsHistory>(
1983 ShardedKey::new(address, u64::MAX),
1984 last_value,
1985 )?;
1986 }
1987 return Ok(());
1988 };
1989
1990 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1992 self.delete::<tables::AccountsHistory>(key.clone())?;
1993 }
1994
1995 let (boundary_key, boundary_list) = &shards[boundary_idx];
1997
1998 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
2000
2001 let new_last =
2003 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2004
2005 if new_last.is_empty() {
2006 if boundary_idx == 0 {
2009 return Ok(());
2011 }
2012
2013 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2014 if prev_key.highest_block_number != u64::MAX {
2015 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
2016 self.put::<tables::AccountsHistory>(
2017 ShardedKey::new(address, u64::MAX),
2018 prev_value,
2019 )?;
2020 }
2021 return Ok(());
2022 }
2023
2024 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
2025
2026 Ok(())
2027 }
2028
2029 #[expect(clippy::too_many_arguments)]
2035 fn prune_history_shards_inner<K>(
2036 &mut self,
2037 shards: Vec<(K, BlockNumberList)>,
2038 to_block: BlockNumber,
2039 get_highest: impl Fn(&K) -> u64,
2040 is_sentinel: impl Fn(&K) -> bool,
2041 delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
2042 put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
2043 create_sentinel: impl Fn() -> K,
2044 ) -> ProviderResult<PruneShardOutcome>
2045 where
2046 K: Clone,
2047 {
2048 if shards.is_empty() {
2049 return Ok(PruneShardOutcome::Unchanged);
2050 }
2051
2052 let mut deleted = false;
2053 let mut updated = false;
2054 let mut last_remaining: Option<(K, BlockNumberList)> = None;
2055
2056 for (key, block_list) in shards {
2057 if !is_sentinel(&key) && get_highest(&key) <= to_block {
2058 delete_shard(self, key)?;
2059 deleted = true;
2060 } else {
2061 let original_len = block_list.len();
2062 let filtered =
2063 BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
2064
2065 if filtered.is_empty() {
2066 delete_shard(self, key)?;
2067 deleted = true;
2068 } else if filtered.len() < original_len {
2069 put_shard(self, key.clone(), &filtered)?;
2070 last_remaining = Some((key, filtered));
2071 updated = true;
2072 } else {
2073 last_remaining = Some((key, block_list));
2074 }
2075 }
2076 }
2077
2078 if let Some((last_key, last_value)) = last_remaining &&
2079 !is_sentinel(&last_key)
2080 {
2081 delete_shard(self, last_key)?;
2082 put_shard(self, create_sentinel(), &last_value)?;
2083 updated = true;
2084 }
2085
2086 if deleted {
2087 Ok(PruneShardOutcome::Deleted)
2088 } else if updated {
2089 Ok(PruneShardOutcome::Updated)
2090 } else {
2091 Ok(PruneShardOutcome::Unchanged)
2092 }
2093 }
2094
2095 pub fn prune_account_history_to(
2100 &mut self,
2101 address: Address,
2102 to_block: BlockNumber,
2103 ) -> ProviderResult<PruneShardOutcome> {
2104 let shards = self.provider.account_history_shards(address)?;
2105 self.prune_history_shards_inner(
2106 shards,
2107 to_block,
2108 |key| key.highest_block_number,
2109 |key| key.highest_block_number == u64::MAX,
2110 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2111 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2112 || ShardedKey::new(address, u64::MAX),
2113 )
2114 }
2115
2116 pub fn prune_account_history_batch(
2125 &mut self,
2126 targets: &[(Address, BlockNumber)],
2127 ) -> ProviderResult<PrunedIndices> {
2128 if targets.is_empty() {
2129 return Ok(PrunedIndices::default());
2130 }
2131
2132 debug_assert!(
2133 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2134 "prune_account_history_batch: targets must be sorted by address"
2135 );
2136
2137 const PREFIX_LEN: usize = 20;
2140
2141 let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2142 let mut iter = self.provider.0.raw_iterator_cf(cf);
2143 let mut outcomes = PrunedIndices::default();
2144
2145 for (address, to_block) in targets {
2146 let start_key = ShardedKey::new(*address, 0u64).encode();
2148 let target_prefix = &start_key[..PREFIX_LEN];
2149
2150 let needs_seek = if iter.valid() {
2156 if let Some(current_key) = iter.key() {
2157 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2161 } else {
2162 true
2163 }
2164 } else {
2165 true
2166 };
2167
2168 if needs_seek {
2169 iter.seek(start_key);
2170 iter.status().map_err(|e| {
2171 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2172 message: e.to_string().into(),
2173 code: -1,
2174 }))
2175 })?;
2176 }
2177
2178 let mut shards = Vec::new();
2180 while iter.valid() {
2181 let Some(key_bytes) = iter.key() else { break };
2182
2183 let current_prefix = key_bytes.get(..PREFIX_LEN);
2185 if current_prefix != Some(target_prefix) {
2186 break;
2187 }
2188
2189 let key = ShardedKey::<Address>::decode(key_bytes)
2191 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2192
2193 let Some(value_bytes) = iter.value() else { break };
2194 let value = BlockNumberList::decompress(value_bytes)
2195 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2196
2197 shards.push((key, value));
2198 iter.next();
2199 }
2200
2201 match self.prune_history_shards_inner(
2202 shards,
2203 *to_block,
2204 |key| key.highest_block_number,
2205 |key| key.highest_block_number == u64::MAX,
2206 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2207 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2208 || ShardedKey::new(*address, u64::MAX),
2209 )? {
2210 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2211 PruneShardOutcome::Updated => outcomes.updated += 1,
2212 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2213 }
2214 }
2215
2216 Ok(outcomes)
2217 }
2218
2219 pub fn prune_storage_history_to(
2225 &mut self,
2226 address: Address,
2227 storage_key: B256,
2228 to_block: BlockNumber,
2229 ) -> ProviderResult<PruneShardOutcome> {
2230 let shards = self.provider.storage_history_shards(address, storage_key)?;
2231 self.prune_history_shards_inner(
2232 shards,
2233 to_block,
2234 |key| key.sharded_key.highest_block_number,
2235 |key| key.sharded_key.highest_block_number == u64::MAX,
2236 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2237 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2238 || StorageShardedKey::last(address, storage_key),
2239 )
2240 }
2241
2242 pub fn prune_storage_history_batch(
2252 &mut self,
2253 targets: &[((Address, B256), BlockNumber)],
2254 ) -> ProviderResult<PrunedIndices> {
2255 if targets.is_empty() {
2256 return Ok(PrunedIndices::default());
2257 }
2258
2259 debug_assert!(
2260 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2261 "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2262 );
2263
2264 const PREFIX_LEN: usize = 52;
2267
2268 let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2269 let mut iter = self.provider.0.raw_iterator_cf(cf);
2270 let mut outcomes = PrunedIndices::default();
2271
2272 for ((address, storage_key), to_block) in targets {
2273 let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2275 let target_prefix = &start_key[..PREFIX_LEN];
2276
2277 let needs_seek = if iter.valid() {
2283 if let Some(current_key) = iter.key() {
2284 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2288 } else {
2289 true
2290 }
2291 } else {
2292 true
2293 };
2294
2295 if needs_seek {
2296 iter.seek(start_key);
2297 iter.status().map_err(|e| {
2298 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2299 message: e.to_string().into(),
2300 code: -1,
2301 }))
2302 })?;
2303 }
2304
2305 let mut shards = Vec::new();
2307 while iter.valid() {
2308 let Some(key_bytes) = iter.key() else { break };
2309
2310 let current_prefix = key_bytes.get(..PREFIX_LEN);
2312 if current_prefix != Some(target_prefix) {
2313 break;
2314 }
2315
2316 let key = StorageShardedKey::decode(key_bytes)
2318 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2319
2320 let Some(value_bytes) = iter.value() else { break };
2321 let value = BlockNumberList::decompress(value_bytes)
2322 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2323
2324 shards.push((key, value));
2325 iter.next();
2326 }
2327
2328 match self.prune_history_shards_inner(
2330 shards,
2331 *to_block,
2332 |key| key.sharded_key.highest_block_number,
2333 |key| key.sharded_key.highest_block_number == u64::MAX,
2334 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2335 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2336 || StorageShardedKey::last(*address, *storage_key),
2337 )? {
2338 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2339 PruneShardOutcome::Updated => outcomes.updated += 1,
2340 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2341 }
2342 }
2343
2344 Ok(outcomes)
2345 }
2346
2347 pub fn unwind_storage_history_to(
2356 &mut self,
2357 address: Address,
2358 storage_key: B256,
2359 keep_to: BlockNumber,
2360 ) -> ProviderResult<()> {
2361 let shards = self.provider.storage_history_shards(address, storage_key)?;
2362 if shards.is_empty() {
2363 return Ok(());
2364 }
2365
2366 let boundary_idx = shards.iter().position(|(key, _)| {
2369 key.sharded_key.highest_block_number == u64::MAX ||
2370 key.sharded_key.highest_block_number > keep_to
2371 });
2372
2373 let Some(boundary_idx) = boundary_idx else {
2375 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2376 if last_key.sharded_key.highest_block_number != u64::MAX {
2377 self.delete::<tables::StoragesHistory>(last_key.clone())?;
2378 self.put::<tables::StoragesHistory>(
2379 StorageShardedKey::last(address, storage_key),
2380 last_value,
2381 )?;
2382 }
2383 return Ok(());
2384 };
2385
2386 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2388 self.delete::<tables::StoragesHistory>(key.clone())?;
2389 }
2390
2391 let (boundary_key, boundary_list) = &shards[boundary_idx];
2393
2394 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2396
2397 let new_last =
2399 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2400
2401 if new_last.is_empty() {
2402 if boundary_idx == 0 {
2405 return Ok(());
2407 }
2408
2409 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2410 if prev_key.sharded_key.highest_block_number != u64::MAX {
2411 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2412 self.put::<tables::StoragesHistory>(
2413 StorageShardedKey::last(address, storage_key),
2414 prev_value,
2415 )?;
2416 }
2417 return Ok(());
2418 }
2419
2420 self.put::<tables::StoragesHistory>(
2421 StorageShardedKey::last(address, storage_key),
2422 &new_last,
2423 )?;
2424
2425 Ok(())
2426 }
2427
2428 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2432 let shards = self.provider.account_history_shards(address)?;
2433 for (key, _) in shards {
2434 self.delete::<tables::AccountsHistory>(key)?;
2435 }
2436 Ok(())
2437 }
2438
2439 pub fn clear_storage_history(
2443 &mut self,
2444 address: Address,
2445 storage_key: B256,
2446 ) -> ProviderResult<()> {
2447 let shards = self.provider.storage_history_shards(address, storage_key)?;
2448 for (key, _) in shards {
2449 self.delete::<tables::StoragesHistory>(key)?;
2450 }
2451 Ok(())
2452 }
2453}
2454
2455pub struct RocksTx<'db> {
2465 inner: Transaction<'db, OptimisticTransactionDB>,
2466 provider: &'db RocksDBProvider,
2467}
2468
2469impl fmt::Debug for RocksTx<'_> {
2470 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2471 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2472 }
2473}
2474
2475impl<'db> RocksTx<'db> {
2476 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2478 let encoded_key = key.encode();
2479 self.get_encoded::<T>(&encoded_key)
2480 }
2481
2482 pub fn get_encoded<T: Table>(
2484 &self,
2485 key: &<T::Key as Encode>::Encoded,
2486 ) -> ProviderResult<Option<T::Value>> {
2487 let cf = self.provider.get_cf_handle::<T>()?;
2488 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2489 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2490 message: e.to_string().into(),
2491 code: -1,
2492 }))
2493 })?;
2494
2495 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2496 }
2497
2498 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2500 let encoded_key = key.encode();
2501 self.put_encoded::<T>(&encoded_key, value)
2502 }
2503
2504 pub fn put_encoded<T: Table>(
2506 &self,
2507 key: &<T::Key as Encode>::Encoded,
2508 value: &T::Value,
2509 ) -> ProviderResult<()> {
2510 let cf = self.provider.get_cf_handle::<T>()?;
2511 let mut buf = Vec::new();
2512 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2513
2514 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2515 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2516 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2517 operation: DatabaseWriteOperation::PutUpsert,
2518 table_name: T::NAME,
2519 key: key.as_ref().to_vec(),
2520 })))
2521 })
2522 }
2523
2524 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2526 let cf = self.provider.get_cf_handle::<T>()?;
2527 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2528 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2529 message: e.to_string().into(),
2530 code: -1,
2531 }))
2532 })
2533 }
2534
2535 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2539 let cf = self.provider.get_cf_handle::<T>()?;
2540 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2541 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2542 }
2543
2544 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2546 let cf = self.provider.get_cf_handle::<T>()?;
2547 let encoded_key = key.encode();
2548 let iter = self
2549 .inner
2550 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2551 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2552 }
2553
2554 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2556 pub fn commit(self) -> ProviderResult<()> {
2557 self.inner.commit().map_err(|e| {
2558 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2559 message: e.to_string().into(),
2560 code: -1,
2561 }))
2562 })
2563 }
2564
2565 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2567 pub fn rollback(self) -> ProviderResult<()> {
2568 self.inner.rollback().map_err(|e| {
2569 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2570 })
2571 }
2572}
2573
2574enum RocksDBIterEnum<'db> {
2576 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2578 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2580}
2581
2582impl Iterator for RocksDBIterEnum<'_> {
2583 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2584
2585 fn next(&mut self) -> Option<Self::Item> {
2586 match self {
2587 Self::ReadWrite(iter) => iter.next(),
2588 Self::ReadOnly(iter) => iter.next(),
2589 }
2590 }
2591}
2592
2593enum RocksDBRawIterEnum<'db> {
2598 ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2600 ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2602}
2603
2604impl RocksDBRawIterEnum<'_> {
2605 fn seek(&mut self, key: impl AsRef<[u8]>) {
2607 match self {
2608 Self::ReadWrite(iter) => iter.seek(key),
2609 Self::ReadOnly(iter) => iter.seek(key),
2610 }
2611 }
2612
2613 fn valid(&self) -> bool {
2615 match self {
2616 Self::ReadWrite(iter) => iter.valid(),
2617 Self::ReadOnly(iter) => iter.valid(),
2618 }
2619 }
2620
2621 fn key(&self) -> Option<&[u8]> {
2623 match self {
2624 Self::ReadWrite(iter) => iter.key(),
2625 Self::ReadOnly(iter) => iter.key(),
2626 }
2627 }
2628
2629 fn value(&self) -> Option<&[u8]> {
2631 match self {
2632 Self::ReadWrite(iter) => iter.value(),
2633 Self::ReadOnly(iter) => iter.value(),
2634 }
2635 }
2636
2637 fn next(&mut self) {
2639 match self {
2640 Self::ReadWrite(iter) => iter.next(),
2641 Self::ReadOnly(iter) => iter.next(),
2642 }
2643 }
2644
2645 fn prev(&mut self) {
2647 match self {
2648 Self::ReadWrite(iter) => iter.prev(),
2649 Self::ReadOnly(iter) => iter.prev(),
2650 }
2651 }
2652
2653 fn status(&self) -> Result<(), rocksdb::Error> {
2655 match self {
2656 Self::ReadWrite(iter) => iter.status(),
2657 Self::ReadOnly(iter) => iter.status(),
2658 }
2659 }
2660}
2661
2662pub struct RocksDBIter<'db, T: Table> {
2666 inner: RocksDBIterEnum<'db>,
2667 _marker: std::marker::PhantomData<T>,
2668}
2669
2670impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2671 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2672 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2673 }
2674}
2675
2676impl<T: Table> Iterator for RocksDBIter<'_, T> {
2677 type Item = ProviderResult<(T::Key, T::Value)>;
2678
2679 fn next(&mut self) -> Option<Self::Item> {
2680 Some(decode_iter_item::<T>(self.inner.next()?))
2681 }
2682}
2683
2684pub struct RocksDBRawIter<'db> {
2688 inner: RocksDBIterEnum<'db>,
2689}
2690
2691impl fmt::Debug for RocksDBRawIter<'_> {
2692 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2693 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2694 }
2695}
2696
2697impl Iterator for RocksDBRawIter<'_> {
2698 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2699
2700 fn next(&mut self) -> Option<Self::Item> {
2701 match self.inner.next()? {
2702 Ok(kv) => Some(Ok(kv)),
2703 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2704 message: e.to_string().into(),
2705 code: -1,
2706 })))),
2707 }
2708 }
2709}
2710
2711pub struct RocksTxIter<'tx, T: Table> {
2715 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2716 _marker: std::marker::PhantomData<T>,
2717}
2718
2719impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2721 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2722 }
2723}
2724
2725impl<T: Table> Iterator for RocksTxIter<'_, T> {
2726 type Item = ProviderResult<(T::Key, T::Value)>;
2727
2728 fn next(&mut self) -> Option<Self::Item> {
2729 Some(decode_iter_item::<T>(self.inner.next()?))
2730 }
2731}
2732
2733fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2738 let (key_bytes, value_bytes) = result.map_err(|e| {
2739 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2740 message: e.to_string().into(),
2741 code: -1,
2742 }))
2743 })?;
2744
2745 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2746 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2747
2748 let value = T::Value::decompress(&value_bytes)
2749 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2750
2751 Ok((key, value))
2752}
2753
2754const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2756 match level {
2757 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2758 LogLevel::Error => rocksdb::LogLevel::Error,
2759 LogLevel::Warn => rocksdb::LogLevel::Warn,
2760 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2761 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2762 }
2763}
2764
2765#[cfg(test)]
2766mod tests {
2767 use super::*;
2768 use crate::providers::HistoryInfo;
2769 use alloy_primitives::{Address, TxHash, B256};
2770 use reth_db_api::{
2771 models::{
2772 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2773 storage_sharded_key::StorageShardedKey,
2774 IntegerList,
2775 },
2776 table::Table,
2777 tables,
2778 };
2779 use tempfile::TempDir;
2780
2781 #[test]
2782 fn test_with_default_tables_registers_required_column_families() {
2783 let temp_dir = TempDir::new().unwrap();
2784
2785 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2787
2788 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2790 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2791 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2792
2793 let key = ShardedKey::new(Address::ZERO, 100);
2795 let value = IntegerList::default();
2796 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2797 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2798
2799 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2801 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2802 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2803 }
2804
2805 #[derive(Debug)]
2806 struct TestTable;
2807
2808 impl Table for TestTable {
2809 const NAME: &'static str = "TestTable";
2810 const DUPSORT: bool = false;
2811 type Key = u64;
2812 type Value = Vec<u8>;
2813 }
2814
2815 #[test]
2816 fn test_basic_operations() {
2817 let temp_dir = TempDir::new().unwrap();
2818
2819 let provider = RocksDBBuilder::new(temp_dir.path())
2820 .with_table::<TestTable>() .build()
2822 .unwrap();
2823
2824 let key = 42u64;
2825 let value = b"test_value".to_vec();
2826
2827 provider.put::<TestTable>(key, &value).unwrap();
2829
2830 let result = provider.get::<TestTable>(key).unwrap();
2832 assert_eq!(result, Some(value));
2833
2834 provider.delete::<TestTable>(key).unwrap();
2836
2837 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2839 }
2840
2841 #[test]
2842 fn test_batch_operations() {
2843 let temp_dir = TempDir::new().unwrap();
2844 let provider =
2845 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2846
2847 provider
2849 .write_batch(|batch| {
2850 for i in 0..10u64 {
2851 let value = format!("value_{i}").into_bytes();
2852 batch.put::<TestTable>(i, &value)?;
2853 }
2854 Ok(())
2855 })
2856 .unwrap();
2857
2858 for i in 0..10u64 {
2860 let value = format!("value_{i}").into_bytes();
2861 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2862 }
2863
2864 provider
2866 .write_batch(|batch| {
2867 for i in 0..10u64 {
2868 batch.delete::<TestTable>(i)?;
2869 }
2870 Ok(())
2871 })
2872 .unwrap();
2873
2874 for i in 0..10u64 {
2876 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2877 }
2878 }
2879
2880 #[test]
2881 fn test_with_real_table() {
2882 let temp_dir = TempDir::new().unwrap();
2883 let provider = RocksDBBuilder::new(temp_dir.path())
2884 .with_table::<tables::TransactionHashNumbers>()
2885 .with_metrics()
2886 .build()
2887 .unwrap();
2888
2889 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2890
2891 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2893 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2894
2895 provider
2897 .write_batch(|batch| {
2898 for i in 0..10u64 {
2899 let hash = TxHash::from(B256::from([i as u8; 32]));
2900 let value = i * 100;
2901 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2902 }
2903 Ok(())
2904 })
2905 .unwrap();
2906
2907 for i in 0..10u64 {
2909 let hash = TxHash::from(B256::from([i as u8; 32]));
2910 assert_eq!(
2911 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2912 Some(i * 100)
2913 );
2914 }
2915 }
2916 #[test]
2917 fn test_statistics_enabled() {
2918 let temp_dir = TempDir::new().unwrap();
2919 let provider = RocksDBBuilder::new(temp_dir.path())
2921 .with_table::<TestTable>()
2922 .with_statistics()
2923 .build()
2924 .unwrap();
2925
2926 for i in 0..10 {
2928 let value = vec![i as u8];
2929 provider.put::<TestTable>(i, &value).unwrap();
2930 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2932 }
2933 }
2934
2935 #[test]
2936 fn test_data_persistence() {
2937 let temp_dir = TempDir::new().unwrap();
2938 let provider =
2939 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2940
2941 let value = vec![42u8; 1000];
2943 for i in 0..100 {
2944 provider.put::<TestTable>(i, &value).unwrap();
2945 }
2946
2947 for i in 0..100 {
2949 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2950 }
2951 }
2952
2953 #[test]
2954 fn test_transaction_read_your_writes() {
2955 let temp_dir = TempDir::new().unwrap();
2956 let provider =
2957 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2958
2959 let tx = provider.tx();
2961
2962 let key = 42u64;
2964 let value = b"test_value".to_vec();
2965 tx.put::<TestTable>(key, &value).unwrap();
2966
2967 let result = tx.get::<TestTable>(key).unwrap();
2969 assert_eq!(
2970 result,
2971 Some(value.clone()),
2972 "Transaction should see its own uncommitted writes"
2973 );
2974
2975 let provider_result = provider.get::<TestTable>(key).unwrap();
2977 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2978
2979 tx.commit().unwrap();
2981
2982 let committed_result = provider.get::<TestTable>(key).unwrap();
2984 assert_eq!(committed_result, Some(value), "Committed data should be visible");
2985 }
2986
2987 #[test]
2988 fn test_transaction_rollback() {
2989 let temp_dir = TempDir::new().unwrap();
2990 let provider =
2991 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2992
2993 let key = 100u64;
2995 let initial_value = b"initial".to_vec();
2996 provider.put::<TestTable>(key, &initial_value).unwrap();
2997
2998 let tx = provider.tx();
3000 let new_value = b"modified".to_vec();
3001 tx.put::<TestTable>(key, &new_value).unwrap();
3002
3003 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
3005
3006 tx.rollback().unwrap();
3008
3009 let result = provider.get::<TestTable>(key).unwrap();
3011 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
3012 }
3013
3014 #[test]
3015 fn test_transaction_iterator() {
3016 let temp_dir = TempDir::new().unwrap();
3017 let provider =
3018 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3019
3020 let tx = provider.tx();
3022
3023 for i in 0..5u64 {
3025 let value = format!("value_{i}").into_bytes();
3026 tx.put::<TestTable>(i, &value).unwrap();
3027 }
3028
3029 let mut count = 0;
3031 for result in tx.iter::<TestTable>().unwrap() {
3032 let (key, value) = result.unwrap();
3033 assert_eq!(value, format!("value_{key}").into_bytes());
3034 count += 1;
3035 }
3036 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
3037
3038 tx.commit().unwrap();
3040 }
3041
3042 #[test]
3043 fn test_batch_manual_commit() {
3044 let temp_dir = TempDir::new().unwrap();
3045 let provider =
3046 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3047
3048 let mut batch = provider.batch();
3050
3051 for i in 0..10u64 {
3053 let value = format!("batch_value_{i}").into_bytes();
3054 batch.put::<TestTable>(i, &value).unwrap();
3055 }
3056
3057 assert_eq!(batch.len(), 10);
3059 assert!(!batch.is_empty());
3060
3061 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
3063
3064 batch.commit().unwrap();
3066
3067 for i in 0..10u64 {
3069 let value = format!("batch_value_{i}").into_bytes();
3070 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3071 }
3072 }
3073
3074 #[test]
3075 fn test_first_and_last_entry() {
3076 let temp_dir = TempDir::new().unwrap();
3077 let provider =
3078 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3079
3080 assert_eq!(provider.first::<TestTable>().unwrap(), None);
3082 assert_eq!(provider.last::<TestTable>().unwrap(), None);
3083
3084 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
3086 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
3087 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
3088
3089 let first = provider.first::<TestTable>().unwrap();
3091 assert_eq!(first, Some((5, b"value_5".to_vec())));
3092
3093 let last = provider.last::<TestTable>().unwrap();
3095 assert_eq!(last, Some((20, b"value_20".to_vec())));
3096 }
3097
3098 #[test]
3102 fn test_account_history_info_pruned_before_first_entry() {
3103 let temp_dir = TempDir::new().unwrap();
3104 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3105
3106 let address = Address::from([0x42; 20]);
3107
3108 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3110 let shard_key = ShardedKey::new(address, u64::MAX);
3111 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3112
3113 let result =
3118 provider.snapshot().account_history_info(address, 50, Some(100), u64::MAX).unwrap();
3119 assert_eq!(result, HistoryInfo::InChangeset(100));
3120 }
3121
3122 #[test]
3124 fn test_account_history_info_read_only_and_catch_up() {
3125 let temp_dir = TempDir::new().unwrap();
3126 let address = Address::from([0x42; 20]);
3127 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3128 let shard_key = ShardedKey::new(address, u64::MAX);
3129
3130 let rw_provider =
3132 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3133 rw_provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3134
3135 let ro_provider = RocksDBBuilder::new(temp_dir.path())
3137 .with_default_tables()
3138 .with_read_only(true)
3139 .build()
3140 .unwrap();
3141
3142 let result =
3143 ro_provider.snapshot().account_history_info(address, 200, None, u64::MAX).unwrap();
3144 assert_eq!(result, HistoryInfo::InChangeset(200));
3145
3146 let result =
3147 ro_provider.snapshot().account_history_info(address, 50, None, u64::MAX).unwrap();
3148 assert_eq!(result, HistoryInfo::NotYetWritten);
3149
3150 let result =
3151 ro_provider.snapshot().account_history_info(address, 400, None, u64::MAX).unwrap();
3152 assert_eq!(result, HistoryInfo::InPlainState);
3153
3154 let address2 = Address::from([0x43; 20]);
3156 let chunk2 = IntegerList::new([500, 600]).unwrap();
3157 let shard_key2 = ShardedKey::new(address2, u64::MAX);
3158 rw_provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
3159
3160 let result =
3162 ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3163 assert_eq!(result, HistoryInfo::NotYetWritten);
3164
3165 ro_provider.try_catch_up_with_primary().unwrap();
3167
3168 let result =
3169 ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3170 assert_eq!(result, HistoryInfo::InChangeset(500));
3171 }
3172
3173 #[test]
3174 fn test_account_history_info_ignores_blocks_above_visible_tip() {
3175 let temp_dir = TempDir::new().unwrap();
3176 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3177
3178 let address = Address::from([0x42; 20]);
3179
3180 provider
3181 .put::<tables::AccountsHistory>(
3182 ShardedKey::new(address, 110),
3183 &IntegerList::new([100, 110]).unwrap(),
3184 )
3185 .unwrap();
3186 provider
3187 .put::<tables::AccountsHistory>(
3188 ShardedKey::new(address, u64::MAX),
3189 &IntegerList::new([200, 210]).unwrap(),
3190 )
3191 .unwrap();
3192
3193 let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3194 assert_eq!(result, HistoryInfo::InPlainState);
3195 }
3196
3197 #[test]
3198 fn test_account_history_info_mixed_shard_respects_visible_tip() {
3199 let temp_dir = TempDir::new().unwrap();
3200 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3201
3202 let address = Address::from([0x42; 20]);
3203 provider
3204 .put::<tables::AccountsHistory>(
3205 ShardedKey::new(address, u64::MAX),
3206 &IntegerList::new([100, 150, 300]).unwrap(),
3207 )
3208 .unwrap();
3209
3210 let result = provider.snapshot().account_history_info(address, 120, None, 200).unwrap();
3211 assert_eq!(result, HistoryInfo::InChangeset(150));
3212
3213 let result = provider.snapshot().account_history_info(address, 201, None, 200).unwrap();
3214 assert_eq!(result, HistoryInfo::InPlainState);
3215 }
3216
3217 #[test]
3218 fn test_account_history_info_only_stale_entries_use_fallback() {
3219 let temp_dir = TempDir::new().unwrap();
3220 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3221
3222 let address = Address::from([0x42; 20]);
3223 provider
3224 .put::<tables::AccountsHistory>(
3225 ShardedKey::new(address, u64::MAX),
3226 &IntegerList::new([200, 210]).unwrap(),
3227 )
3228 .unwrap();
3229
3230 let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3231 assert_eq!(result, HistoryInfo::NotYetWritten);
3232
3233 let result =
3234 provider.snapshot().account_history_info(address, 150, Some(100), 150).unwrap();
3235 assert_eq!(result, HistoryInfo::MaybeInPlainState);
3236 }
3237
3238 #[test]
3239 fn test_account_history_shard_split_at_boundary() {
3240 let temp_dir = TempDir::new().unwrap();
3241 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3242
3243 let address = Address::from([0x42; 20]);
3244 let limit = NUM_OF_INDICES_IN_SHARD;
3245
3246 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3248 let mut batch = provider.batch();
3249 batch.append_account_history_shard(address, indices).unwrap();
3250 batch.commit().unwrap();
3251
3252 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3254 let sentinel_key = ShardedKey::new(address, u64::MAX);
3255
3256 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3257 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3258
3259 assert!(completed_shard.is_some(), "completed shard should exist");
3260 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3261
3262 let completed_shard = completed_shard.unwrap();
3263 let sentinel_shard = sentinel_shard.unwrap();
3264
3265 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3266 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3267 }
3268
3269 #[test]
3270 fn test_account_history_multiple_shard_splits() {
3271 let temp_dir = TempDir::new().unwrap();
3272 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3273
3274 let address = Address::from([0x43; 20]);
3275 let limit = NUM_OF_INDICES_IN_SHARD;
3276
3277 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3279 let mut batch = provider.batch();
3280 batch.append_account_history_shard(address, first_batch_indices).unwrap();
3281 batch.commit().unwrap();
3282
3283 let sentinel_key = ShardedKey::new(address, u64::MAX);
3285 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3286 assert!(shard.is_some());
3287 assert_eq!(shard.unwrap().len(), limit as u64);
3288
3289 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3291 let mut batch = provider.batch();
3292 batch.append_account_history_shard(address, second_batch_indices).unwrap();
3293 batch.commit().unwrap();
3294
3295 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3297 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3298
3299 assert!(
3300 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3301 "first completed shard should exist"
3302 );
3303 assert!(
3304 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3305 "second completed shard should exist"
3306 );
3307 assert!(
3308 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3309 "sentinel shard should exist"
3310 );
3311 }
3312
3313 #[test]
3314 fn test_storage_history_shard_split_at_boundary() {
3315 let temp_dir = TempDir::new().unwrap();
3316 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3317
3318 let address = Address::from([0x44; 20]);
3319 let slot = B256::from([0x55; 32]);
3320 let limit = NUM_OF_INDICES_IN_SHARD;
3321
3322 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3324 let mut batch = provider.batch();
3325 batch.append_storage_history_shard(address, slot, indices).unwrap();
3326 batch.commit().unwrap();
3327
3328 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3330 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3331
3332 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3333 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3334
3335 assert!(completed_shard.is_some(), "completed shard should exist");
3336 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3337
3338 let completed_shard = completed_shard.unwrap();
3339 let sentinel_shard = sentinel_shard.unwrap();
3340
3341 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3342 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3343 }
3344
3345 #[test]
3346 fn test_storage_history_multiple_shard_splits() {
3347 let temp_dir = TempDir::new().unwrap();
3348 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3349
3350 let address = Address::from([0x46; 20]);
3351 let slot = B256::from([0x57; 32]);
3352 let limit = NUM_OF_INDICES_IN_SHARD;
3353
3354 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3356 let mut batch = provider.batch();
3357 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3358 batch.commit().unwrap();
3359
3360 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3362 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3363 assert!(shard.is_some());
3364 assert_eq!(shard.unwrap().len(), limit as u64);
3365
3366 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3368 let mut batch = provider.batch();
3369 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3370 batch.commit().unwrap();
3371
3372 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3374 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3375
3376 assert!(
3377 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3378 "first completed shard should exist"
3379 );
3380 assert!(
3381 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3382 "second completed shard should exist"
3383 );
3384 assert!(
3385 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3386 "sentinel shard should exist"
3387 );
3388 }
3389
3390 #[test]
3391 fn test_clear_table() {
3392 let temp_dir = TempDir::new().unwrap();
3393 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3394
3395 let address = Address::from([0x42; 20]);
3396 let key = ShardedKey::new(address, u64::MAX);
3397 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3398
3399 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3400 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3401
3402 provider.clear::<tables::AccountsHistory>().unwrap();
3403
3404 assert!(
3405 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3406 "table should be empty after clear"
3407 );
3408 assert!(
3409 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3410 "first() should return None after clear"
3411 );
3412 }
3413
3414 #[test]
3415 fn test_clear_empty_table() {
3416 let temp_dir = TempDir::new().unwrap();
3417 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3418
3419 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3420
3421 provider.clear::<tables::AccountsHistory>().unwrap();
3422
3423 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3424 }
3425
3426 #[test]
3427 fn test_unwind_account_history_to_basic() {
3428 let temp_dir = TempDir::new().unwrap();
3429 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3430
3431 let address = Address::from([0x42; 20]);
3432
3433 let mut batch = provider.batch();
3435 batch.append_account_history_shard(address, 0..=10).unwrap();
3436 batch.commit().unwrap();
3437
3438 let key = ShardedKey::new(address, u64::MAX);
3440 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3441 assert!(result.is_some());
3442 let blocks: Vec<u64> = result.unwrap().iter().collect();
3443 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3444
3445 let mut batch = provider.batch();
3447 batch.unwind_account_history_to(address, 5).unwrap();
3448 batch.commit().unwrap();
3449
3450 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3452 assert!(result.is_some());
3453 let blocks: Vec<u64> = result.unwrap().iter().collect();
3454 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3455 }
3456
3457 #[test]
3458 fn test_unwind_account_history_to_removes_all() {
3459 let temp_dir = TempDir::new().unwrap();
3460 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3461
3462 let address = Address::from([0x42; 20]);
3463
3464 let mut batch = provider.batch();
3466 batch.append_account_history_shard(address, 5..=10).unwrap();
3467 batch.commit().unwrap();
3468
3469 let mut batch = provider.batch();
3471 batch.unwind_account_history_to(address, 4).unwrap();
3472 batch.commit().unwrap();
3473
3474 let key = ShardedKey::new(address, u64::MAX);
3476 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3477 assert!(result.is_none(), "Should have no data after full unwind");
3478 }
3479
3480 #[test]
3481 fn test_unwind_account_history_to_no_op() {
3482 let temp_dir = TempDir::new().unwrap();
3483 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3484
3485 let address = Address::from([0x42; 20]);
3486
3487 let mut batch = provider.batch();
3489 batch.append_account_history_shard(address, 0..=5).unwrap();
3490 batch.commit().unwrap();
3491
3492 let mut batch = provider.batch();
3494 batch.unwind_account_history_to(address, 10).unwrap();
3495 batch.commit().unwrap();
3496
3497 let key = ShardedKey::new(address, u64::MAX);
3499 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3500 assert!(result.is_some());
3501 let blocks: Vec<u64> = result.unwrap().iter().collect();
3502 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3503 }
3504
3505 #[test]
3506 fn test_unwind_account_history_to_block_zero() {
3507 let temp_dir = TempDir::new().unwrap();
3508 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3509
3510 let address = Address::from([0x42; 20]);
3511
3512 let mut batch = provider.batch();
3514 batch.append_account_history_shard(address, 0..=5).unwrap();
3515 batch.commit().unwrap();
3516
3517 let mut batch = provider.batch();
3520 batch.unwind_account_history_to(address, 0).unwrap();
3521 batch.commit().unwrap();
3522
3523 let key = ShardedKey::new(address, u64::MAX);
3525 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3526 assert!(result.is_some());
3527 let blocks: Vec<u64> = result.unwrap().iter().collect();
3528 assert_eq!(blocks, vec![0]);
3529 }
3530
3531 #[test]
3532 fn test_unwind_account_history_to_multi_shard() {
3533 let temp_dir = TempDir::new().unwrap();
3534 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3535
3536 let address = Address::from([0x42; 20]);
3537
3538 let mut batch = provider.batch();
3541
3542 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3544 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3545
3546 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3548 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3549
3550 batch.commit().unwrap();
3551
3552 let shards = provider.account_history_shards(address).unwrap();
3554 assert_eq!(shards.len(), 2);
3555
3556 let mut batch = provider.batch();
3558 batch.unwind_account_history_to(address, 75).unwrap();
3559 batch.commit().unwrap();
3560
3561 let shards = provider.account_history_shards(address).unwrap();
3563 assert_eq!(shards.len(), 2);
3564
3565 assert_eq!(shards[0].0.highest_block_number, 50);
3567 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3568
3569 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3571 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3572 }
3573
3574 #[test]
3575 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3576 let temp_dir = TempDir::new().unwrap();
3577 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3578
3579 let address = Address::from([0x42; 20]);
3580
3581 let mut batch = provider.batch();
3583
3584 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3586 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3587
3588 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3590 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3591
3592 batch.commit().unwrap();
3593
3594 let mut batch = provider.batch();
3596 batch.unwind_account_history_to(address, 60).unwrap();
3597 batch.commit().unwrap();
3598
3599 let shards = provider.account_history_shards(address).unwrap();
3601 assert_eq!(shards.len(), 1);
3602 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3603 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3604 }
3605
3606 #[test]
3607 fn test_account_history_shards_iterator() {
3608 let temp_dir = TempDir::new().unwrap();
3609 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3610
3611 let address = Address::from([0x42; 20]);
3612 let other_address = Address::from([0x43; 20]);
3613
3614 let mut batch = provider.batch();
3616 batch.append_account_history_shard(address, 0..=5).unwrap();
3617 batch.append_account_history_shard(other_address, 10..=15).unwrap();
3618 batch.commit().unwrap();
3619
3620 let shards = provider.account_history_shards(address).unwrap();
3622 assert_eq!(shards.len(), 1);
3623 assert_eq!(shards[0].0.key, address);
3624
3625 let shards = provider.account_history_shards(other_address).unwrap();
3627 assert_eq!(shards.len(), 1);
3628 assert_eq!(shards[0].0.key, other_address);
3629
3630 let non_existent = Address::from([0x99; 20]);
3632 let shards = provider.account_history_shards(non_existent).unwrap();
3633 assert!(shards.is_empty());
3634 }
3635
3636 #[test]
3637 fn test_clear_account_history() {
3638 let temp_dir = TempDir::new().unwrap();
3639 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3640
3641 let address = Address::from([0x42; 20]);
3642
3643 let mut batch = provider.batch();
3645 batch.append_account_history_shard(address, 0..=10).unwrap();
3646 batch.commit().unwrap();
3647
3648 let mut batch = provider.batch();
3650 batch.clear_account_history(address).unwrap();
3651 batch.commit().unwrap();
3652
3653 let shards = provider.account_history_shards(address).unwrap();
3655 assert!(shards.is_empty(), "All shards should be deleted");
3656 }
3657
3658 #[test]
3659 fn test_unwind_non_sentinel_boundary() {
3660 let temp_dir = TempDir::new().unwrap();
3661 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3662
3663 let address = Address::from([0x42; 20]);
3664
3665 let mut batch = provider.batch();
3667
3668 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3670 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3671
3672 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3674 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3675
3676 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3678 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3679
3680 batch.commit().unwrap();
3681
3682 let shards = provider.account_history_shards(address).unwrap();
3684 assert_eq!(shards.len(), 3);
3685
3686 let mut batch = provider.batch();
3688 batch.unwind_account_history_to(address, 75).unwrap();
3689 batch.commit().unwrap();
3690
3691 let shards = provider.account_history_shards(address).unwrap();
3693 assert_eq!(shards.len(), 2);
3694
3695 assert_eq!(shards[0].0.highest_block_number, 50);
3697 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3698
3699 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3701 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3702 }
3703
3704 #[test]
3705 fn test_batch_auto_commit_on_threshold() {
3706 let temp_dir = TempDir::new().unwrap();
3707 let provider =
3708 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3709
3710 let mut batch = RocksDBBatch {
3712 provider: &provider,
3713 inner: WriteBatchWithTransaction::<true>::default(),
3714 buf: Vec::new(),
3715 auto_commit_threshold: Some(1024), };
3717
3718 for i in 0..100u64 {
3721 let value = format!("value_{i:04}").into_bytes();
3722 batch.put::<TestTable>(i, &value).unwrap();
3723 }
3724
3725 let first_visible = provider.get::<TestTable>(0).unwrap();
3728 assert!(first_visible.is_some(), "Auto-committed data should be visible");
3729
3730 batch.commit().unwrap();
3732
3733 for i in 0..100u64 {
3735 let value = format!("value_{i:04}").into_bytes();
3736 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3737 }
3738 }
3739
3740 struct AccountPruneCase {
3744 name: &'static str,
3745 initial_shards: &'static [(u64, &'static [u64])],
3746 prune_to: u64,
3747 expected_outcome: PruneShardOutcome,
3748 expected_shards: &'static [(u64, &'static [u64])],
3749 }
3750
3751 struct StoragePruneCase {
3753 name: &'static str,
3754 initial_shards: &'static [(u64, &'static [u64])],
3755 prune_to: u64,
3756 expected_outcome: PruneShardOutcome,
3757 expected_shards: &'static [(u64, &'static [u64])],
3758 }
3759
3760 #[test]
3761 fn test_prune_account_history_cases() {
3762 const MAX: u64 = u64::MAX;
3763 const CASES: &[AccountPruneCase] = &[
3764 AccountPruneCase {
3765 name: "single_shard_truncate",
3766 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3767 prune_to: 25,
3768 expected_outcome: PruneShardOutcome::Updated,
3769 expected_shards: &[(MAX, &[30, 40])],
3770 },
3771 AccountPruneCase {
3772 name: "single_shard_delete_all",
3773 initial_shards: &[(MAX, &[10, 20])],
3774 prune_to: 20,
3775 expected_outcome: PruneShardOutcome::Deleted,
3776 expected_shards: &[],
3777 },
3778 AccountPruneCase {
3779 name: "single_shard_noop",
3780 initial_shards: &[(MAX, &[10, 20])],
3781 prune_to: 5,
3782 expected_outcome: PruneShardOutcome::Unchanged,
3783 expected_shards: &[(MAX, &[10, 20])],
3784 },
3785 AccountPruneCase {
3786 name: "no_shards",
3787 initial_shards: &[],
3788 prune_to: 100,
3789 expected_outcome: PruneShardOutcome::Unchanged,
3790 expected_shards: &[],
3791 },
3792 AccountPruneCase {
3793 name: "multi_shard_truncate_first",
3794 initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3795 prune_to: 25,
3796 expected_outcome: PruneShardOutcome::Updated,
3797 expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3798 },
3799 AccountPruneCase {
3800 name: "delete_first_shard_sentinel_unchanged",
3801 initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3802 prune_to: 20,
3803 expected_outcome: PruneShardOutcome::Deleted,
3804 expected_shards: &[(MAX, &[30, 40])],
3805 },
3806 AccountPruneCase {
3807 name: "multi_shard_delete_all_but_last",
3808 initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3809 prune_to: 22,
3810 expected_outcome: PruneShardOutcome::Deleted,
3811 expected_shards: &[(MAX, &[25, 30])],
3812 },
3813 AccountPruneCase {
3814 name: "mid_shard_preserves_key",
3815 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3816 prune_to: 25,
3817 expected_outcome: PruneShardOutcome::Updated,
3818 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3819 },
3820 AccountPruneCase {
3822 name: "equiv_delete_early_shards_keep_sentinel",
3823 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3824 prune_to: 55,
3825 expected_outcome: PruneShardOutcome::Deleted,
3826 expected_shards: &[(MAX, &[60, 70])],
3827 },
3828 AccountPruneCase {
3829 name: "equiv_sentinel_becomes_empty_with_prev",
3830 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3831 prune_to: 40,
3832 expected_outcome: PruneShardOutcome::Deleted,
3833 expected_shards: &[(MAX, &[50])],
3834 },
3835 AccountPruneCase {
3836 name: "equiv_all_shards_become_empty",
3837 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3838 prune_to: 51,
3839 expected_outcome: PruneShardOutcome::Deleted,
3840 expected_shards: &[],
3841 },
3842 AccountPruneCase {
3843 name: "equiv_non_sentinel_last_shard_promoted",
3844 initial_shards: &[(100, &[50, 75, 100])],
3845 prune_to: 60,
3846 expected_outcome: PruneShardOutcome::Updated,
3847 expected_shards: &[(MAX, &[75, 100])],
3848 },
3849 AccountPruneCase {
3850 name: "equiv_filter_within_shard",
3851 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3852 prune_to: 25,
3853 expected_outcome: PruneShardOutcome::Updated,
3854 expected_shards: &[(MAX, &[30, 40])],
3855 },
3856 AccountPruneCase {
3857 name: "equiv_multi_shard_partial_delete",
3858 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3859 prune_to: 35,
3860 expected_outcome: PruneShardOutcome::Deleted,
3861 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3862 },
3863 ];
3864
3865 let address = Address::from([0x42; 20]);
3866
3867 for case in CASES {
3868 let temp_dir = TempDir::new().unwrap();
3869 let provider =
3870 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3871
3872 let mut batch = provider.batch();
3874 for (highest, blocks) in case.initial_shards {
3875 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3876 batch
3877 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3878 .unwrap();
3879 }
3880 batch.commit().unwrap();
3881
3882 let mut batch = provider.batch();
3884 let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3885 batch.commit().unwrap();
3886
3887 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3889
3890 let shards = provider.account_history_shards(address).unwrap();
3892 assert_eq!(
3893 shards.len(),
3894 case.expected_shards.len(),
3895 "case '{}': wrong shard count",
3896 case.name
3897 );
3898 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3899 shards.iter().zip(case.expected_shards.iter()).enumerate()
3900 {
3901 assert_eq!(
3902 key.highest_block_number, *exp_key,
3903 "case '{}': shard {} wrong key",
3904 case.name, i
3905 );
3906 assert_eq!(
3907 blocks.iter().collect::<Vec<_>>(),
3908 *exp_blocks,
3909 "case '{}': shard {} wrong blocks",
3910 case.name,
3911 i
3912 );
3913 }
3914 }
3915 }
3916
3917 #[test]
3918 fn test_prune_storage_history_cases() {
3919 const MAX: u64 = u64::MAX;
3920 const CASES: &[StoragePruneCase] = &[
3921 StoragePruneCase {
3922 name: "single_shard_truncate",
3923 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3924 prune_to: 25,
3925 expected_outcome: PruneShardOutcome::Updated,
3926 expected_shards: &[(MAX, &[30, 40])],
3927 },
3928 StoragePruneCase {
3929 name: "single_shard_delete_all",
3930 initial_shards: &[(MAX, &[10, 20])],
3931 prune_to: 20,
3932 expected_outcome: PruneShardOutcome::Deleted,
3933 expected_shards: &[],
3934 },
3935 StoragePruneCase {
3936 name: "noop",
3937 initial_shards: &[(MAX, &[10, 20])],
3938 prune_to: 5,
3939 expected_outcome: PruneShardOutcome::Unchanged,
3940 expected_shards: &[(MAX, &[10, 20])],
3941 },
3942 StoragePruneCase {
3943 name: "no_shards",
3944 initial_shards: &[],
3945 prune_to: 100,
3946 expected_outcome: PruneShardOutcome::Unchanged,
3947 expected_shards: &[],
3948 },
3949 StoragePruneCase {
3950 name: "mid_shard_preserves_key",
3951 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3952 prune_to: 25,
3953 expected_outcome: PruneShardOutcome::Updated,
3954 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3955 },
3956 StoragePruneCase {
3958 name: "equiv_sentinel_promotion",
3959 initial_shards: &[(100, &[50, 75, 100])],
3960 prune_to: 60,
3961 expected_outcome: PruneShardOutcome::Updated,
3962 expected_shards: &[(MAX, &[75, 100])],
3963 },
3964 StoragePruneCase {
3965 name: "equiv_delete_early_shards_keep_sentinel",
3966 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3967 prune_to: 55,
3968 expected_outcome: PruneShardOutcome::Deleted,
3969 expected_shards: &[(MAX, &[60, 70])],
3970 },
3971 StoragePruneCase {
3972 name: "equiv_sentinel_becomes_empty_with_prev",
3973 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3974 prune_to: 40,
3975 expected_outcome: PruneShardOutcome::Deleted,
3976 expected_shards: &[(MAX, &[50])],
3977 },
3978 StoragePruneCase {
3979 name: "equiv_all_shards_become_empty",
3980 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3981 prune_to: 51,
3982 expected_outcome: PruneShardOutcome::Deleted,
3983 expected_shards: &[],
3984 },
3985 StoragePruneCase {
3986 name: "equiv_filter_within_shard",
3987 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3988 prune_to: 25,
3989 expected_outcome: PruneShardOutcome::Updated,
3990 expected_shards: &[(MAX, &[30, 40])],
3991 },
3992 StoragePruneCase {
3993 name: "equiv_multi_shard_partial_delete",
3994 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3995 prune_to: 35,
3996 expected_outcome: PruneShardOutcome::Deleted,
3997 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3998 },
3999 ];
4000
4001 let address = Address::from([0x42; 20]);
4002 let storage_key = B256::from([0x01; 32]);
4003
4004 for case in CASES {
4005 let temp_dir = TempDir::new().unwrap();
4006 let provider =
4007 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4008
4009 let mut batch = provider.batch();
4011 for (highest, blocks) in case.initial_shards {
4012 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4013 let key = if *highest == MAX {
4014 StorageShardedKey::last(address, storage_key)
4015 } else {
4016 StorageShardedKey::new(address, storage_key, *highest)
4017 };
4018 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4019 }
4020 batch.commit().unwrap();
4021
4022 let mut batch = provider.batch();
4024 let outcome =
4025 batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
4026 batch.commit().unwrap();
4027
4028 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
4030
4031 let shards = provider.storage_history_shards(address, storage_key).unwrap();
4033 assert_eq!(
4034 shards.len(),
4035 case.expected_shards.len(),
4036 "case '{}': wrong shard count",
4037 case.name
4038 );
4039 for (i, ((key, blocks), (exp_key, exp_blocks))) in
4040 shards.iter().zip(case.expected_shards.iter()).enumerate()
4041 {
4042 assert_eq!(
4043 key.sharded_key.highest_block_number, *exp_key,
4044 "case '{}': shard {} wrong key",
4045 case.name, i
4046 );
4047 assert_eq!(
4048 blocks.iter().collect::<Vec<_>>(),
4049 *exp_blocks,
4050 "case '{}': shard {} wrong blocks",
4051 case.name,
4052 i
4053 );
4054 }
4055 }
4056 }
4057
4058 #[test]
4059 fn test_prune_storage_history_does_not_affect_other_slots() {
4060 let temp_dir = TempDir::new().unwrap();
4061 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4062
4063 let address = Address::from([0x42; 20]);
4064 let slot1 = B256::from([0x01; 32]);
4065 let slot2 = B256::from([0x02; 32]);
4066
4067 let mut batch = provider.batch();
4069 batch
4070 .put::<tables::StoragesHistory>(
4071 StorageShardedKey::last(address, slot1),
4072 &BlockNumberList::new_pre_sorted([10u64, 20]),
4073 )
4074 .unwrap();
4075 batch
4076 .put::<tables::StoragesHistory>(
4077 StorageShardedKey::last(address, slot2),
4078 &BlockNumberList::new_pre_sorted([30u64, 40]),
4079 )
4080 .unwrap();
4081 batch.commit().unwrap();
4082
4083 let mut batch = provider.batch();
4085 let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
4086 batch.commit().unwrap();
4087
4088 assert_eq!(outcome, PruneShardOutcome::Deleted);
4089
4090 let shards1 = provider.storage_history_shards(address, slot1).unwrap();
4092 assert!(shards1.is_empty());
4093
4094 let shards2 = provider.storage_history_shards(address, slot2).unwrap();
4096 assert_eq!(shards2.len(), 1);
4097 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
4098 }
4099
4100 #[test]
4101 fn test_prune_invariants() {
4102 let address = Address::from([0x42; 20]);
4104 let storage_key = B256::from([0x01; 32]);
4105
4106 #[expect(clippy::type_complexity)]
4108 let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
4109 (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
4111 (&[(100, &[50, 100])], 60),
4113 ];
4114
4115 for (initial_shards, prune_to) in invariant_cases {
4116 {
4118 let temp_dir = TempDir::new().unwrap();
4119 let provider =
4120 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4121
4122 let mut batch = provider.batch();
4123 for (highest, blocks) in *initial_shards {
4124 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4125 batch
4126 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
4127 .unwrap();
4128 }
4129 batch.commit().unwrap();
4130
4131 let mut batch = provider.batch();
4132 batch.prune_account_history_to(address, *prune_to).unwrap();
4133 batch.commit().unwrap();
4134
4135 let shards = provider.account_history_shards(address).unwrap();
4136
4137 for (key, blocks) in &shards {
4139 assert!(
4140 !blocks.is_empty(),
4141 "Account: empty shard at key {}",
4142 key.highest_block_number
4143 );
4144 }
4145
4146 if !shards.is_empty() {
4148 let last = shards.last().unwrap();
4149 assert_eq!(
4150 last.0.highest_block_number,
4151 u64::MAX,
4152 "Account: last shard must be sentinel"
4153 );
4154 }
4155 }
4156
4157 {
4159 let temp_dir = TempDir::new().unwrap();
4160 let provider =
4161 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4162
4163 let mut batch = provider.batch();
4164 for (highest, blocks) in *initial_shards {
4165 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4166 let key = if *highest == u64::MAX {
4167 StorageShardedKey::last(address, storage_key)
4168 } else {
4169 StorageShardedKey::new(address, storage_key, *highest)
4170 };
4171 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4172 }
4173 batch.commit().unwrap();
4174
4175 let mut batch = provider.batch();
4176 batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
4177 batch.commit().unwrap();
4178
4179 let shards = provider.storage_history_shards(address, storage_key).unwrap();
4180
4181 for (key, blocks) in &shards {
4183 assert!(
4184 !blocks.is_empty(),
4185 "Storage: empty shard at key {}",
4186 key.sharded_key.highest_block_number
4187 );
4188 }
4189
4190 if !shards.is_empty() {
4192 let last = shards.last().unwrap();
4193 assert_eq!(
4194 last.0.sharded_key.highest_block_number,
4195 u64::MAX,
4196 "Storage: last shard must be sentinel"
4197 );
4198 }
4199 }
4200 }
4201 }
4202
4203 #[test]
4204 fn test_prune_account_history_batch_multiple_sorted_targets() {
4205 let temp_dir = TempDir::new().unwrap();
4206 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4207
4208 let addr1 = Address::from([0x01; 20]);
4209 let addr2 = Address::from([0x02; 20]);
4210 let addr3 = Address::from([0x03; 20]);
4211
4212 let mut batch = provider.batch();
4214 batch
4215 .put::<tables::AccountsHistory>(
4216 ShardedKey::new(addr1, u64::MAX),
4217 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4218 )
4219 .unwrap();
4220 batch
4221 .put::<tables::AccountsHistory>(
4222 ShardedKey::new(addr2, u64::MAX),
4223 &BlockNumberList::new_pre_sorted([5, 10, 15]),
4224 )
4225 .unwrap();
4226 batch
4227 .put::<tables::AccountsHistory>(
4228 ShardedKey::new(addr3, u64::MAX),
4229 &BlockNumberList::new_pre_sorted([100, 200]),
4230 )
4231 .unwrap();
4232 batch.commit().unwrap();
4233
4234 let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4236 targets.sort_by_key(|(addr, _)| *addr);
4237
4238 let mut batch = provider.batch();
4239 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4240 batch.commit().unwrap();
4241
4242 assert_eq!(outcomes.updated, 2);
4246 assert_eq!(outcomes.unchanged, 1);
4247
4248 let shards1 = provider.account_history_shards(addr1).unwrap();
4249 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4250
4251 let shards2 = provider.account_history_shards(addr2).unwrap();
4252 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4253
4254 let shards3 = provider.account_history_shards(addr3).unwrap();
4255 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4256 }
4257
4258 #[test]
4259 fn test_prune_account_history_batch_target_with_no_shards() {
4260 let temp_dir = TempDir::new().unwrap();
4261 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4262
4263 let addr1 = Address::from([0x01; 20]);
4264 let addr2 = Address::from([0x02; 20]); let addr3 = Address::from([0x03; 20]);
4266
4267 let mut batch = provider.batch();
4269 batch
4270 .put::<tables::AccountsHistory>(
4271 ShardedKey::new(addr1, u64::MAX),
4272 &BlockNumberList::new_pre_sorted([10, 20]),
4273 )
4274 .unwrap();
4275 batch
4276 .put::<tables::AccountsHistory>(
4277 ShardedKey::new(addr3, u64::MAX),
4278 &BlockNumberList::new_pre_sorted([30, 40]),
4279 )
4280 .unwrap();
4281 batch.commit().unwrap();
4282
4283 let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4285 targets.sort_by_key(|(addr, _)| *addr);
4286
4287 let mut batch = provider.batch();
4288 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4289 batch.commit().unwrap();
4290
4291 assert_eq!(outcomes.updated, 2);
4295 assert_eq!(outcomes.unchanged, 1);
4296
4297 let shards1 = provider.account_history_shards(addr1).unwrap();
4298 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4299
4300 let shards3 = provider.account_history_shards(addr3).unwrap();
4301 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4302 }
4303
4304 #[test]
4305 fn test_prune_storage_history_batch_multiple_sorted_targets() {
4306 let temp_dir = TempDir::new().unwrap();
4307 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4308
4309 let addr = Address::from([0x42; 20]);
4310 let slot1 = B256::from([0x01; 32]);
4311 let slot2 = B256::from([0x02; 32]);
4312
4313 let mut batch = provider.batch();
4315 batch
4316 .put::<tables::StoragesHistory>(
4317 StorageShardedKey::new(addr, slot1, u64::MAX),
4318 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4319 )
4320 .unwrap();
4321 batch
4322 .put::<tables::StoragesHistory>(
4323 StorageShardedKey::new(addr, slot2, u64::MAX),
4324 &BlockNumberList::new_pre_sorted([5, 15, 25]),
4325 )
4326 .unwrap();
4327 batch.commit().unwrap();
4328
4329 let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4331 targets.sort_by_key(|((a, s), _)| (*a, *s));
4332
4333 let mut batch = provider.batch();
4334 let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4335 batch.commit().unwrap();
4336
4337 assert_eq!(outcomes.updated, 2);
4338
4339 let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4340 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4341
4342 let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4343 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4344 }
4345}