1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5 map::{AddressMap, HashMap},
6 Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use rayon::prelude::*;
12use reth_chain_state::ExecutedBlock;
13use reth_db_api::{
14 database_metrics::DatabaseMetrics,
15 models::{
16 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
17 StorageSettings,
18 },
19 table::{Compress, Decode, Decompress, Encode, Table},
20 tables, BlockNumberList, DatabaseError,
21};
22use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
23use reth_prune_types::PruneMode;
24use reth_storage_errors::{
25 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
26 provider::{ProviderError, ProviderResult},
27};
28use rocksdb::{
29 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
30 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
31 OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
32 WriteBatchWithTransaction, WriteBufferManager, WriteOptions, DB,
33};
34use std::{
35 collections::BTreeMap,
36 fmt,
37 path::{Path, PathBuf},
38 sync::Arc,
39};
40use tracing::instrument;
41
42fn synced_write_options() -> WriteOptions {
44 let mut opts = WriteOptions::default();
45 opts.set_sync(true);
46 opts
47}
48
49pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
51
52type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
54
55#[derive(Debug, Clone)]
57pub struct RocksDBTableStats {
58 pub sst_size_bytes: u64,
60 pub memtable_size_bytes: u64,
62 pub name: String,
64 pub estimated_num_keys: u64,
66 pub estimated_size_bytes: u64,
68 pub pending_compaction_bytes: u64,
70}
71
72#[derive(Debug, Clone)]
76pub struct RocksDBStats {
77 pub tables: Vec<RocksDBTableStats>,
79 pub wal_size_bytes: u64,
83}
84
85#[derive(Clone)]
87pub(crate) struct RocksDBWriteCtx {
88 pub first_block_number: BlockNumber,
90 pub prune_tx_lookup: Option<PruneMode>,
92 pub storage_settings: StorageSettings,
94 pub pending_batches: PendingRocksDBBatches,
96}
97
98impl fmt::Debug for RocksDBWriteCtx {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("RocksDBWriteCtx")
101 .field("first_block_number", &self.first_block_number)
102 .field("prune_tx_lookup", &self.prune_tx_lookup)
103 .field("storage_settings", &self.storage_settings)
104 .field("pending_batches", &"<pending batches>")
105 .finish()
106 }
107}
108
109const DEFAULT_CACHE_SIZE: usize = 128 << 20;
111
112const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
114
115const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
117
118const DEFAULT_MAX_OPEN_FILES: i32 = 512;
126
127const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
129
130const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
136
137const DEFAULT_WRITE_BUFFER_MANAGER_SIZE: usize = 4 * 1024 * 1024 * 1024;
142
143const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
147
148const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 512 * 1024 * 1024;
155
156pub struct RocksDBBuilder {
158 path: PathBuf,
159 column_families: Vec<String>,
160 enable_metrics: bool,
161 enable_statistics: bool,
162 log_level: rocksdb::LogLevel,
163 block_cache: Cache,
164 read_only: bool,
165}
166
167impl fmt::Debug for RocksDBBuilder {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 f.debug_struct("RocksDBBuilder")
170 .field("path", &self.path)
171 .field("column_families", &self.column_families)
172 .field("enable_metrics", &self.enable_metrics)
173 .finish()
174 }
175}
176
177impl RocksDBBuilder {
178 pub fn new(path: impl AsRef<Path>) -> Self {
180 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
181 Self {
182 path: path.as_ref().to_path_buf(),
183 column_families: Vec::new(),
184 enable_metrics: false,
185 enable_statistics: false,
186 log_level: rocksdb::LogLevel::Info,
187 block_cache: cache,
188 read_only: false,
189 }
190 }
191
192 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
194 let mut table_options = BlockBasedOptions::default();
195 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
196 table_options.set_cache_index_and_filter_blocks(true);
197 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
198 table_options.set_block_cache(cache);
200 table_options
201 }
202
203 fn default_options(
205 log_level: rocksdb::LogLevel,
206 cache: &Cache,
207 enable_statistics: bool,
208 ) -> Options {
209 let table_options = Self::default_table_options(cache);
211
212 let mut options = Options::default();
213 options.set_block_based_table_factory(&table_options);
214 options.create_if_missing(true);
215 options.create_missing_column_families(true);
216 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
217 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
218 let write_buffer_manager =
219 WriteBufferManager::new_write_buffer_manager(DEFAULT_WRITE_BUFFER_MANAGER_SIZE, true);
220 options.set_write_buffer_manager(&write_buffer_manager);
221
222 options.set_bottommost_compression_type(DBCompressionType::Zstd);
223 options.set_bottommost_zstd_max_train_bytes(0, true);
224 options.set_compression_type(DBCompressionType::Lz4);
225 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
226
227 options.set_log_level(log_level);
228
229 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
230
231 options.set_wal_ttl_seconds(0);
234 options.set_wal_size_limit_mb(0);
235
236 if enable_statistics {
238 options.enable_statistics();
239 }
240
241 options
242 }
243
244 fn default_column_family_options(cache: &Cache) -> Options {
246 let table_options = Self::default_table_options(cache);
248
249 let mut cf_options = Options::default();
250 cf_options.set_block_based_table_factory(&table_options);
251 cf_options.set_level_compaction_dynamic_level_bytes(true);
252 cf_options.set_compression_type(DBCompressionType::Lz4);
254 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
255 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
257 cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
258
259 cf_options
260 }
261
262 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
269 let mut table_options = BlockBasedOptions::default();
270 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
271 table_options.set_cache_index_and_filter_blocks(true);
272 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
273 table_options.set_block_cache(cache);
274 let mut cf_options = Options::default();
278 cf_options.set_block_based_table_factory(&table_options);
279 cf_options.set_level_compaction_dynamic_level_bytes(true);
280 cf_options.set_compression_type(DBCompressionType::None);
283 cf_options.set_bottommost_compression_type(DBCompressionType::None);
284
285 cf_options
286 }
287
288 pub fn with_table<T: Table>(mut self) -> Self {
290 self.column_families.push(T::NAME.to_string());
291 self
292 }
293
294 pub fn with_default_tables(self) -> Self {
301 self.with_table::<tables::TransactionHashNumbers>()
302 .with_table::<tables::AccountsHistory>()
303 .with_table::<tables::StoragesHistory>()
304 }
305
306 pub const fn with_metrics(mut self) -> Self {
308 self.enable_metrics = true;
309 self
310 }
311
312 pub const fn with_statistics(mut self) -> Self {
314 self.enable_statistics = true;
315 self
316 }
317
318 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
320 if let Some(level) = log_level {
321 self.log_level = convert_log_level(level);
322 }
323 self
324 }
325
326 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
328 self.block_cache = Cache::new_lru_cache(capacity_bytes);
329 self
330 }
331
332 pub const fn with_read_only(mut self, read_only: bool) -> Self {
340 self.read_only = read_only;
341 self
342 }
343
344 pub fn build(self) -> ProviderResult<RocksDBProvider> {
346 let options =
347 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
348
349 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
350 .column_families
351 .iter()
352 .map(|name| {
353 let cf_options = if name == tables::TransactionHashNumbers::NAME {
354 Self::tx_hash_numbers_column_family_options(&self.block_cache)
355 } else {
356 Self::default_column_family_options(&self.block_cache)
357 };
358 ColumnFamilyDescriptor::new(name.clone(), cf_options)
359 })
360 .collect();
361
362 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
363
364 if self.read_only {
365 let mut options = options;
368 options.set_max_open_files(-1);
369
370 let secondary_path = self
371 .path
372 .parent()
373 .unwrap_or(&self.path)
374 .join(format!("rocksdb-secondary-tmp-{}", std::process::id()));
375 reth_fs_util::create_dir_all(&secondary_path).map_err(ProviderError::other)?;
376
377 let db = DB::open_cf_descriptors_as_secondary(
378 &options,
379 &self.path,
380 &secondary_path,
381 cf_descriptors,
382 )
383 .map_err(|e| {
384 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
385 message: e.to_string().into(),
386 code: -1,
387 }))
388 })?;
389 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::Secondary {
390 db,
391 metrics,
392 secondary_path,
393 })))
394 } else {
395 let db =
400 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
401 .map_err(|e| {
402 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
403 message: e.to_string().into(),
404 code: -1,
405 }))
406 })?;
407 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
408 }
409 }
410}
411
412macro_rules! compress_to_buf_or_ref {
415 ($buf:expr, $value:expr) => {
416 if let Some(value) = $value.uncompressable_ref() {
417 Some(value)
418 } else {
419 $buf.clear();
420 $value.compress_to_buf(&mut $buf);
421 None
422 }
423 };
424}
425
426#[derive(Debug)]
428pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
429
430enum RocksDBProviderInner {
432 ReadWrite {
434 db: OptimisticTransactionDB,
436 metrics: Option<RocksDBMetrics>,
438 },
439 Secondary {
443 db: DB,
445 metrics: Option<RocksDBMetrics>,
447 secondary_path: PathBuf,
449 },
450}
451
452impl RocksDBProviderInner {
453 const fn metrics(&self) -> Option<&RocksDBMetrics> {
455 match self {
456 Self::ReadWrite { metrics, .. } | Self::Secondary { metrics, .. } => metrics.as_ref(),
457 }
458 }
459
460 fn db_rw(&self) -> &OptimisticTransactionDB {
462 match self {
463 Self::ReadWrite { db, .. } => db,
464 Self::Secondary { .. } => {
465 panic!("Cannot perform write operation on secondary RocksDB provider")
466 }
467 }
468 }
469
470 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
472 let cf = match self {
473 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
474 Self::Secondary { db, .. } => db.cf_handle(T::NAME),
475 };
476 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
477 }
478
479 fn get_cf(
481 &self,
482 cf: &rocksdb::ColumnFamily,
483 key: impl AsRef<[u8]>,
484 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
485 match self {
486 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
487 Self::Secondary { db, .. } => db.get_cf(cf, key),
488 }
489 }
490
491 fn put_cf(
493 &self,
494 cf: &rocksdb::ColumnFamily,
495 key: impl AsRef<[u8]>,
496 value: impl AsRef<[u8]>,
497 ) -> Result<(), rocksdb::Error> {
498 self.db_rw().put_cf(cf, key, value)
499 }
500
501 fn delete_cf(
503 &self,
504 cf: &rocksdb::ColumnFamily,
505 key: impl AsRef<[u8]>,
506 ) -> Result<(), rocksdb::Error> {
507 self.db_rw().delete_cf(cf, key)
508 }
509
510 fn delete_range_cf<K: AsRef<[u8]>>(
512 &self,
513 cf: &rocksdb::ColumnFamily,
514 from: K,
515 to: K,
516 ) -> Result<(), rocksdb::Error> {
517 self.db_rw().delete_range_cf(cf, from, to)
518 }
519
520 fn iterator_cf(
522 &self,
523 cf: &rocksdb::ColumnFamily,
524 mode: IteratorMode<'_>,
525 ) -> RocksDBIterEnum<'_> {
526 match self {
527 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
528 Self::Secondary { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
529 }
530 }
531
532 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
537 match self {
538 Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
539 Self::Secondary { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
540 }
541 }
542
543 fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
545 match self {
546 Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
547 Self::Secondary { db, .. } => RocksReadSnapshotInner::Secondary(db),
548 }
549 }
550
551 fn path(&self) -> &Path {
553 match self {
554 Self::ReadWrite { db, .. } => db.path(),
555 Self::Secondary { db, .. } => db.path(),
556 }
557 }
558
559 fn wal_size_bytes(&self) -> u64 {
563 let path = self.path();
564
565 match std::fs::read_dir(path) {
566 Ok(entries) => entries
567 .filter_map(|e| e.ok())
568 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
569 .filter_map(|e| e.metadata().ok())
570 .map(|m| m.len())
571 .sum(),
572 Err(_) => 0,
573 }
574 }
575
576 fn table_stats(&self) -> Vec<RocksDBTableStats> {
578 let mut stats = Vec::new();
579
580 macro_rules! collect_stats {
581 ($db:expr) => {
582 for cf_name in ROCKSDB_TABLES {
583 if let Some(cf) = $db.cf_handle(cf_name) {
584 let estimated_num_keys = $db
585 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
586 .ok()
587 .flatten()
588 .unwrap_or(0);
589
590 let sst_size = $db
592 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
593 .ok()
594 .flatten()
595 .unwrap_or(0);
596
597 let memtable_size = $db
598 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
599 .ok()
600 .flatten()
601 .unwrap_or(0);
602
603 let estimated_size_bytes = sst_size + memtable_size;
604
605 let pending_compaction_bytes = $db
606 .property_int_value_cf(
607 cf,
608 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
609 )
610 .ok()
611 .flatten()
612 .unwrap_or(0);
613
614 stats.push(RocksDBTableStats {
615 sst_size_bytes: sst_size,
616 memtable_size_bytes: memtable_size,
617 name: cf_name.to_string(),
618 estimated_num_keys,
619 estimated_size_bytes,
620 pending_compaction_bytes,
621 });
622 }
623 }
624 };
625 }
626
627 match self {
628 Self::ReadWrite { db, .. } => collect_stats!(db),
629 Self::Secondary { db, .. } => collect_stats!(db),
630 }
631
632 stats
633 }
634
635 fn db_stats(&self) -> RocksDBStats {
637 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
638 }
639}
640
641impl fmt::Debug for RocksDBProviderInner {
642 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643 match self {
644 Self::ReadWrite { metrics, .. } => f
645 .debug_struct("RocksDBProviderInner::ReadWrite")
646 .field("db", &"<OptimisticTransactionDB>")
647 .field("metrics", metrics)
648 .finish(),
649 Self::Secondary { metrics, .. } => f
650 .debug_struct("RocksDBProviderInner::Secondary")
651 .field("db", &"<DB (secondary)>")
652 .field("metrics", metrics)
653 .finish(),
654 }
655 }
656}
657
658impl Drop for RocksDBProviderInner {
659 fn drop(&mut self) {
660 match self {
661 Self::ReadWrite { db, .. } => {
662 if let Err(e) = db.flush_wal(true) {
665 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
666 }
667 for cf_name in ROCKSDB_TABLES {
668 if let Some(cf) = db.cf_handle(cf_name) &&
669 let Err(e) = db.flush_cf(&cf)
670 {
671 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
672 }
673 }
674 db.cancel_all_background_work(true);
675 }
676 Self::Secondary { db, secondary_path, .. } => {
677 db.cancel_all_background_work(true);
678 let _ = std::fs::remove_dir_all(secondary_path);
679 }
680 }
681 }
682}
683
684impl Clone for RocksDBProvider {
685 fn clone(&self) -> Self {
686 Self(self.0.clone())
687 }
688}
689
690impl DatabaseMetrics for RocksDBProvider {
691 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
692 let mut metrics = Vec::new();
693
694 for stat in self.table_stats() {
695 metrics.push((
696 "rocksdb.table_size",
697 stat.estimated_size_bytes as f64,
698 vec![Label::new("table", stat.name.clone())],
699 ));
700 metrics.push((
701 "rocksdb.table_entries",
702 stat.estimated_num_keys as f64,
703 vec![Label::new("table", stat.name.clone())],
704 ));
705 metrics.push((
706 "rocksdb.pending_compaction_bytes",
707 stat.pending_compaction_bytes as f64,
708 vec![Label::new("table", stat.name.clone())],
709 ));
710 metrics.push((
711 "rocksdb.sst_size",
712 stat.sst_size_bytes as f64,
713 vec![Label::new("table", stat.name.clone())],
714 ));
715 metrics.push((
716 "rocksdb.memtable_size",
717 stat.memtable_size_bytes as f64,
718 vec![Label::new("table", stat.name)],
719 ));
720 }
721
722 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
724
725 metrics
726 }
727}
728
729impl RocksDBProvider {
730 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
732 RocksDBBuilder::new(path).build()
733 }
734
735 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
737 RocksDBBuilder::new(path)
738 }
739
740 pub fn exists(path: impl AsRef<Path>) -> bool {
745 path.as_ref().join("CURRENT").exists()
746 }
747
748 pub fn is_read_only(&self) -> bool {
750 matches!(self.0.as_ref(), RocksDBProviderInner::Secondary { .. })
751 }
752
753 pub fn try_catch_up_with_primary(&self) -> ProviderResult<()> {
758 match self.0.as_ref() {
759 RocksDBProviderInner::Secondary { db, .. } => {
760 db.try_catch_up_with_primary().map_err(|e| {
761 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
762 message: e.to_string().into(),
763 code: -1,
764 }))
765 })
766 }
767 _ => Ok(()),
768 }
769 }
770
771 pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
775 RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
776 }
777
778 pub fn tx(&self) -> RocksTx<'_> {
786 let write_options = synced_write_options();
787 let txn_options = OptimisticTransactionOptions::default();
788 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
789 RocksTx { inner, provider: self }
790 }
791
792 pub fn batch(&self) -> RocksDBBatch<'_> {
800 RocksDBBatch {
801 provider: self,
802 inner: WriteBatchWithTransaction::<true>::default(),
803 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
804 auto_commit_threshold: None,
805 }
806 }
807
808 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
814 RocksDBBatch {
815 provider: self,
816 inner: WriteBatchWithTransaction::<true>::default(),
817 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
818 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
819 }
820 }
821
822 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
824 self.0.cf_handle::<T>()
825 }
826
827 fn execute_with_operation_metric<R>(
829 &self,
830 operation: RocksDBOperation,
831 table: &'static str,
832 f: impl FnOnce(&Self) -> R,
833 ) -> R {
834 let start = self.0.metrics().map(|_| Instant::now());
835 let res = f(self);
836
837 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
838 metrics.record_operation(operation, table, start.elapsed());
839 }
840
841 res
842 }
843
844 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
846 self.get_encoded::<T>(&key.encode())
847 }
848
849 pub fn get_encoded<T: Table>(
851 &self,
852 key: &<T::Key as Encode>::Encoded,
853 ) -> ProviderResult<Option<T::Value>> {
854 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
855 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
856 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
857 message: e.to_string().into(),
858 code: -1,
859 }))
860 })?;
861
862 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
863 })
864 }
865
866 pub fn get_raw<T: Table>(&self, key: T::Key) -> ProviderResult<Option<Vec<u8>>> {
868 let encoded = key.encode();
869 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
870 this.0.get_cf(this.get_cf_handle::<T>()?, encoded.as_ref()).map_err(|e| {
871 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
872 message: e.to_string().into(),
873 code: -1,
874 }))
875 })
876 })
877 }
878
879 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
884 let encoded_key = key.encode();
885 self.put_encoded::<T>(&encoded_key, value)
886 }
887
888 pub fn put_encoded<T: Table>(
893 &self,
894 key: &<T::Key as Encode>::Encoded,
895 value: &T::Value,
896 ) -> ProviderResult<()> {
897 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
898 let mut buf = Vec::new();
902 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
903
904 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
905 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
906 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
907 operation: DatabaseWriteOperation::PutUpsert,
908 table_name: T::NAME,
909 key: key.as_ref().to_vec(),
910 })))
911 })
912 })
913 }
914
915 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
920 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
921 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
922 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
923 message: e.to_string().into(),
924 code: -1,
925 }))
926 })
927 })
928 }
929
930 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
936 let cf = self.get_cf_handle::<T>()?;
937
938 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
939 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
940 message: e.to_string().into(),
941 code: -1,
942 }))
943 })?;
944
945 Ok(())
946 }
947
948 fn get_boundary<T: Table>(
950 &self,
951 mode: IteratorMode<'_>,
952 ) -> ProviderResult<Option<(T::Key, T::Value)>> {
953 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
954 let cf = this.get_cf_handle::<T>()?;
955 let mut iter = this.0.iterator_cf(cf, mode);
956
957 match iter.next() {
958 Some(Ok((key_bytes, value_bytes))) => {
959 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
960 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
961 let value = T::Value::decompress(&value_bytes)
962 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
963 Ok(Some((key, value)))
964 }
965 Some(Err(e)) => {
966 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
967 message: e.to_string().into(),
968 code: -1,
969 })))
970 }
971 None => Ok(None),
972 }
973 })
974 }
975
976 #[inline]
978 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
979 self.get_boundary::<T>(IteratorMode::Start)
980 }
981
982 #[inline]
984 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
985 self.get_boundary::<T>(IteratorMode::End)
986 }
987
988 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
992 let cf = self.get_cf_handle::<T>()?;
993 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
994 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
995 }
996
997 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksDBIter<'_, T>> {
1001 let cf = self.get_cf_handle::<T>()?;
1002 let encoded_key = key.encode();
1003 let iter = self
1004 .0
1005 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
1006 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
1007 }
1008
1009 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
1013 self.0.table_stats()
1014 }
1015
1016 pub fn wal_size_bytes(&self) -> u64 {
1022 self.0.wal_size_bytes()
1023 }
1024
1025 pub fn db_stats(&self) -> RocksDBStats {
1029 self.0.db_stats()
1030 }
1031
1032 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
1043 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
1044 let db = self.0.db_rw();
1045
1046 for cf_name in tables {
1047 if let Some(cf) = db.cf_handle(cf_name) {
1048 db.flush_cf(&cf).map_err(|e| {
1049 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1050 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1051 operation: DatabaseWriteOperation::Flush,
1052 table_name: cf_name,
1053 key: Vec::new(),
1054 })))
1055 })?;
1056 }
1057 }
1058
1059 db.flush_wal(true).map_err(|e| {
1060 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1061 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1062 operation: DatabaseWriteOperation::Flush,
1063 table_name: "WAL",
1064 key: Vec::new(),
1065 })))
1066 })?;
1067
1068 Ok(())
1069 }
1070
1071 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1083 pub fn flush_and_compact(&self) -> ProviderResult<()> {
1084 self.flush(ROCKSDB_TABLES)?;
1085
1086 let db = self.0.db_rw();
1087
1088 for cf_name in ROCKSDB_TABLES {
1089 if let Some(cf) = db.cf_handle(cf_name) {
1090 db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
1091 }
1092 }
1093
1094 Ok(())
1095 }
1096
1097 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1101 let cf = self.get_cf_handle::<T>()?;
1102 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1103 Ok(RocksDBRawIter { inner: iter })
1104 }
1105
1106 pub fn account_history_shards(
1111 &self,
1112 address: Address,
1113 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1114 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1116
1117 let start_key = ShardedKey::new(address, 0u64);
1120 let start_bytes = start_key.encode();
1121
1122 let iter = self
1124 .0
1125 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1126
1127 let mut result = Vec::new();
1128 for item in iter {
1129 match item {
1130 Ok((key_bytes, value_bytes)) => {
1131 let key = ShardedKey::<Address>::decode(&key_bytes)
1133 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1134
1135 if key.key != address {
1137 break;
1138 }
1139
1140 let value = BlockNumberList::decompress(&value_bytes)
1142 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1143
1144 result.push((key, value));
1145 }
1146 Err(e) => {
1147 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1148 message: e.to_string().into(),
1149 code: -1,
1150 })));
1151 }
1152 }
1153 }
1154
1155 Ok(result)
1156 }
1157
1158 pub fn storage_history_shards(
1163 &self,
1164 address: Address,
1165 storage_key: B256,
1166 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1167 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1168
1169 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1170 let start_bytes = start_key.encode();
1171
1172 let iter = self
1173 .0
1174 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1175
1176 let mut result = Vec::new();
1177 for item in iter {
1178 match item {
1179 Ok((key_bytes, value_bytes)) => {
1180 let key = StorageShardedKey::decode(&key_bytes)
1181 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1182
1183 if key.address != address || key.sharded_key.key != storage_key {
1184 break;
1185 }
1186
1187 let value = BlockNumberList::decompress(&value_bytes)
1188 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1189
1190 result.push((key, value));
1191 }
1192 Err(e) => {
1193 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1194 message: e.to_string().into(),
1195 code: -1,
1196 })));
1197 }
1198 }
1199 }
1200
1201 Ok(result)
1202 }
1203
1204 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1212 pub fn unwind_account_history_indices(
1213 &self,
1214 last_indices: &[(Address, BlockNumber)],
1215 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1216 let mut address_min_block: AddressMap<BlockNumber> =
1217 AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1218 for &(address, block_number) in last_indices {
1219 address_min_block
1220 .entry(address)
1221 .and_modify(|min| *min = (*min).min(block_number))
1222 .or_insert(block_number);
1223 }
1224
1225 let mut batch = self.batch();
1226 for (address, min_block) in address_min_block {
1227 match min_block.checked_sub(1) {
1228 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1229 None => batch.clear_account_history(address)?,
1230 }
1231 }
1232
1233 Ok(batch.into_inner())
1234 }
1235
1236 pub fn unwind_storage_history_indices(
1244 &self,
1245 storage_changesets: &[(Address, B256, BlockNumber)],
1246 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1247 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1248 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1249 for &(address, storage_key, block_number) in storage_changesets {
1250 key_min_block
1251 .entry((address, storage_key))
1252 .and_modify(|min| *min = (*min).min(block_number))
1253 .or_insert(block_number);
1254 }
1255
1256 let mut batch = self.batch();
1257 for ((address, storage_key), min_block) in key_min_block {
1258 match min_block.checked_sub(1) {
1259 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1260 None => batch.clear_storage_history(address, storage_key)?,
1261 }
1262 }
1263
1264 Ok(batch.into_inner())
1265 }
1266
1267 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1269 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1270 where
1271 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1272 {
1273 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1274 let mut batch_handle = this.batch();
1275 f(&mut batch_handle)?;
1276 batch_handle.commit()
1277 })
1278 }
1279
1280 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1288 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1289 self.0.db_rw().write_opt(batch, &synced_write_options()).map_err(|e| {
1290 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1291 message: e.to_string().into(),
1292 code: -1,
1293 }))
1294 })
1295 }
1296
1297 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1303 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1304 &self,
1305 blocks: &[ExecutedBlock<N>],
1306 tx_nums: &[TxNumber],
1307 ctx: RocksDBWriteCtx,
1308 runtime: &reth_tasks::Runtime,
1309 ) -> ProviderResult<()> {
1310 if !ctx.storage_settings.storage_v2 {
1311 return Ok(());
1312 }
1313
1314 let mut r_tx_hash = None;
1315 let mut r_account_history = None;
1316 let mut r_storage_history = None;
1317
1318 let write_tx_hash =
1319 ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1320 let write_account_history = ctx.storage_settings.storage_v2;
1321 let write_storage_history = ctx.storage_settings.storage_v2;
1322
1323 let span = tracing::Span::current();
1326 runtime.storage_pool().in_place_scope(|s| {
1327 if write_tx_hash {
1328 s.spawn(|_| {
1329 let _guard = span.enter();
1330 r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1331 });
1332 }
1333
1334 if write_account_history {
1335 s.spawn(|_| {
1336 let _guard = span.enter();
1337 r_account_history = Some(self.write_account_history(blocks, &ctx));
1338 });
1339 }
1340
1341 if write_storage_history {
1342 s.spawn(|_| {
1343 let _guard = span.enter();
1344 r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1345 });
1346 }
1347 });
1348
1349 if write_tx_hash {
1350 r_tx_hash.ok_or_else(|| {
1351 ProviderError::Database(DatabaseError::Other(
1352 "rocksdb tx-hash write thread panicked".into(),
1353 ))
1354 })??;
1355 }
1356 if write_account_history {
1357 r_account_history.ok_or_else(|| {
1358 ProviderError::Database(DatabaseError::Other(
1359 "rocksdb account-history write thread panicked".into(),
1360 ))
1361 })??;
1362 }
1363 if write_storage_history {
1364 r_storage_history.ok_or_else(|| {
1365 ProviderError::Database(DatabaseError::Other(
1366 "rocksdb storage-history write thread panicked".into(),
1367 ))
1368 })??;
1369 }
1370
1371 Ok(())
1372 }
1373
1374 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1376 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1377 &self,
1378 blocks: &[ExecutedBlock<N>],
1379 tx_nums: &[TxNumber],
1380 ctx: &RocksDBWriteCtx,
1381 ) -> ProviderResult<()> {
1382 let mut batch = self.batch();
1383 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1384 let body = block.recovered_block().body();
1385 for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1386 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1387 }
1388 }
1389 ctx.pending_batches.lock().push(batch.into_inner());
1390 Ok(())
1391 }
1392
1393 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1397 fn write_account_history<N: reth_node_types::NodePrimitives>(
1398 &self,
1399 blocks: &[ExecutedBlock<N>],
1400 ctx: &RocksDBWriteCtx,
1401 ) -> ProviderResult<()> {
1402 let mut batch = self.batch();
1403 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1404
1405 for (block_idx, block) in blocks.iter().enumerate() {
1406 let block_number = ctx.first_block_number + block_idx as u64;
1407 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1408
1409 for account_block_reverts in reverts.accounts {
1412 for (address, _) in account_block_reverts {
1413 account_history.entry(address).or_default().push(block_number);
1414 }
1415 }
1416 }
1417
1418 for (address, indices) in account_history {
1420 batch.append_account_history_shard(address, indices)?;
1421 }
1422 ctx.pending_batches.lock().push(batch.into_inner());
1423 Ok(())
1424 }
1425
1426 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1430 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1431 &self,
1432 blocks: &[ExecutedBlock<N>],
1433 ctx: &RocksDBWriteCtx,
1434 ) -> ProviderResult<()> {
1435 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1436
1437 for (block_idx, block) in blocks.iter().enumerate() {
1438 let block_number = ctx.first_block_number + block_idx as u64;
1439 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1440
1441 for storage_block_reverts in reverts.storage {
1444 for revert in storage_block_reverts {
1445 for (slot, _) in revert.storage_revert {
1446 let plain_key = B256::new(slot.to_be_bytes());
1447 storage_history
1448 .entry((revert.address, plain_key))
1449 .or_default()
1450 .push(block_number);
1451 }
1452 }
1453 }
1454 }
1455
1456 let shard_puts = storage_history
1457 .into_par_iter()
1458 .map(|((address, slot), indices)| {
1459 self.storage_history_shards_to_put(address, slot, indices)
1460 })
1461 .collect::<ProviderResult<Vec<_>>>()?;
1462
1463 let mut batch = self.batch();
1464 for shards in shard_puts {
1465 for (key, shard) in shards {
1466 batch.put::<tables::StoragesHistory>(key, &shard)?;
1467 }
1468 }
1469 ctx.pending_batches.lock().push(batch.into_inner());
1470 Ok(())
1471 }
1472
1473 fn storage_history_shards_to_put(
1476 &self,
1477 address: Address,
1478 storage_key: B256,
1479 indices: Vec<u64>,
1480 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1481 if indices.is_empty() {
1482 return Ok(Vec::new());
1483 }
1484
1485 debug_assert!(
1486 indices.windows(2).all(|w| w[0] < w[1]),
1487 "indices must be strictly increasing: {:?}",
1488 indices
1489 );
1490
1491 let last_key = StorageShardedKey::last(address, storage_key);
1492 let last_shard_opt = self.get::<tables::StoragesHistory>(last_key.clone())?;
1493 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1494
1495 last_shard.append(indices).map_err(ProviderError::other)?;
1496
1497 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1498 return Ok(vec![(last_key, last_shard)]);
1499 }
1500
1501 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1502 let mut chunks_peekable = chunks.into_iter().peekable();
1503 let mut shards = Vec::new();
1504
1505 while let Some(chunk) = chunks_peekable.next() {
1506 let shard = BlockNumberList::new_pre_sorted(chunk);
1507 let highest_block_number = if chunks_peekable.peek().is_some() {
1508 shard.iter().next_back().expect("`chunks` does not return empty list")
1509 } else {
1510 u64::MAX
1511 };
1512
1513 shards
1514 .push((StorageShardedKey::new(address, storage_key, highest_block_number), shard));
1515 }
1516
1517 Ok(shards)
1518 }
1519}
1520
1521pub struct RocksReadSnapshot<'db> {
1529 inner: RocksReadSnapshotInner<'db>,
1530 provider: &'db RocksDBProvider,
1531}
1532
1533enum RocksReadSnapshotInner<'db> {
1535 ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1537 Secondary(&'db DB),
1539}
1540
1541impl<'db> RocksReadSnapshotInner<'db> {
1542 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1544 match self {
1545 Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1546 Self::Secondary(db) => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
1547 }
1548 }
1549}
1550
1551impl fmt::Debug for RocksReadSnapshot<'_> {
1552 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1553 f.debug_struct("RocksReadSnapshot")
1554 .field("provider", &self.provider)
1555 .finish_non_exhaustive()
1556 }
1557}
1558
1559impl<'db> RocksReadSnapshot<'db> {
1560 fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1562 self.provider.get_cf_handle::<T>()
1563 }
1564
1565 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1567 let encoded_key = key.encode();
1568 let cf = self.cf_handle::<T>()?;
1569 let result = match &self.inner {
1570 RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1571 RocksReadSnapshotInner::Secondary(db) => db.get_cf(cf, encoded_key.as_ref()),
1572 }
1573 .map_err(|e| {
1574 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1575 message: e.to_string().into(),
1576 code: -1,
1577 }))
1578 })?;
1579
1580 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1581 }
1582
1583 pub fn account_history_info(
1588 &self,
1589 address: Address,
1590 block_number: BlockNumber,
1591 lowest_available_block_number: Option<BlockNumber>,
1592 visible_tip: BlockNumber,
1593 ) -> ProviderResult<HistoryInfo> {
1594 let key = ShardedKey::new(address, block_number);
1595 self.history_info::<tables::AccountsHistory>(
1596 key.encode().as_ref(),
1597 block_number,
1598 lowest_available_block_number,
1599 visible_tip,
1600 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1601 |prev_bytes| {
1602 <ShardedKey<Address> as Decode>::decode(prev_bytes)
1603 .map(|k| k.key == address)
1604 .unwrap_or(false)
1605 },
1606 )
1607 }
1608
1609 pub fn storage_history_info(
1614 &self,
1615 address: Address,
1616 storage_key: B256,
1617 block_number: BlockNumber,
1618 lowest_available_block_number: Option<BlockNumber>,
1619 visible_tip: BlockNumber,
1620 ) -> ProviderResult<HistoryInfo> {
1621 let key = StorageShardedKey::new(address, storage_key, block_number);
1622 self.history_info::<tables::StoragesHistory>(
1623 key.encode().as_ref(),
1624 block_number,
1625 lowest_available_block_number,
1626 visible_tip,
1627 |key_bytes| {
1628 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1629 Ok(k.address == address && k.sharded_key.key == storage_key)
1630 },
1631 |prev_bytes| {
1632 <StorageShardedKey as Decode>::decode(prev_bytes)
1633 .map(|k| k.address == address && k.sharded_key.key == storage_key)
1634 .unwrap_or(false)
1635 },
1636 )
1637 }
1638
1639 fn history_info<T>(
1645 &self,
1646 encoded_key: &[u8],
1647 block_number: BlockNumber,
1648 lowest_available_block_number: Option<BlockNumber>,
1649 visible_tip: BlockNumber,
1650 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1651 prev_key_matches: impl Fn(&[u8]) -> bool,
1652 ) -> ProviderResult<HistoryInfo>
1653 where
1654 T: Table<Value = BlockNumberList>,
1655 {
1656 let is_maybe_pruned = lowest_available_block_number.is_some();
1657 let fallback = || {
1658 Ok(if is_maybe_pruned {
1659 HistoryInfo::MaybeInPlainState
1660 } else {
1661 HistoryInfo::NotYetWritten
1662 })
1663 };
1664
1665 let cf = self.cf_handle::<T>()?;
1666 let mut iter = self.inner.raw_iterator_cf(cf);
1667
1668 iter.seek(encoded_key);
1669 iter.status().map_err(|e| {
1670 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1671 message: e.to_string().into(),
1672 code: -1,
1673 }))
1674 })?;
1675
1676 if !iter.valid() {
1677 return fallback();
1678 }
1679
1680 let Some(key_bytes) = iter.key() else {
1681 return fallback();
1682 };
1683 if !key_matches(key_bytes)? {
1684 return fallback();
1685 }
1686
1687 let Some(value_bytes) = iter.value() else {
1688 return fallback();
1689 };
1690 let chunk = BlockNumberList::decompress(value_bytes)?;
1691
1692 let (rank, found_block) = compute_history_rank(&chunk, block_number);
1693 let found_block = found_block.filter(|block| *block <= visible_tip);
1695
1696 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1697 iter.prev();
1698 iter.status().map_err(|e| {
1699 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1700 message: e.to_string().into(),
1701 code: -1,
1702 }))
1703 })?;
1704 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1705
1706 if found_block.is_none() && !has_prev {
1710 return fallback()
1711 }
1712
1713 !has_prev
1714 } else {
1715 false
1716 };
1717
1718 Ok(HistoryInfo::from_lookup(
1719 found_block,
1720 is_before_first_write,
1721 lowest_available_block_number,
1722 ))
1723 }
1724}
1725
1726#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1728pub enum PruneShardOutcome {
1729 Deleted,
1731 Updated,
1733 Unchanged,
1735}
1736
1737#[derive(Debug, Default, Clone, Copy)]
1739pub struct PrunedIndices {
1740 pub deleted: usize,
1742 pub updated: usize,
1744 pub unchanged: usize,
1746}
1747
1748#[must_use = "batch must be committed"]
1758pub struct RocksDBBatch<'a> {
1759 provider: &'a RocksDBProvider,
1760 inner: WriteBatchWithTransaction<true>,
1761 buf: Vec<u8>,
1762 auto_commit_threshold: Option<usize>,
1764}
1765
1766impl fmt::Debug for RocksDBBatch<'_> {
1767 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1768 f.debug_struct("RocksDBBatch")
1769 .field("provider", &self.provider)
1770 .field("batch", &"<WriteBatchWithTransaction>")
1771 .field("length", &self.inner.len())
1773 .field("size_in_bytes", &self.inner.size_in_bytes())
1776 .finish()
1777 }
1778}
1779
1780impl<'a> RocksDBBatch<'a> {
1781 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1785 let encoded_key = key.encode();
1786 self.put_encoded::<T>(&encoded_key, value)
1787 }
1788
1789 pub fn put_encoded<T: Table>(
1793 &mut self,
1794 key: &<T::Key as Encode>::Encoded,
1795 value: &T::Value,
1796 ) -> ProviderResult<()> {
1797 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1798 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1799 self.maybe_auto_commit()?;
1800 Ok(())
1801 }
1802
1803 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1807 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1808 self.maybe_auto_commit()?;
1809 Ok(())
1810 }
1811
1812 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1817 if let Some(threshold) = self.auto_commit_threshold &&
1818 self.inner.size_in_bytes() >= threshold
1819 {
1820 tracing::debug!(
1821 target: "providers::rocksdb",
1822 batch_size = self.inner.size_in_bytes(),
1823 threshold,
1824 "Auto-committing RocksDB batch"
1825 );
1826 let old_batch = std::mem::take(&mut self.inner);
1827 self.provider.0.db_rw().write_opt(old_batch, &synced_write_options()).map_err(|e| {
1828 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1829 message: e.to_string().into(),
1830 code: -1,
1831 }))
1832 })?;
1833 }
1834 Ok(())
1835 }
1836
1837 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1844 pub fn commit(self) -> ProviderResult<()> {
1845 self.provider.0.db_rw().write_opt(self.inner, &synced_write_options()).map_err(|e| {
1846 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1847 message: e.to_string().into(),
1848 code: -1,
1849 }))
1850 })
1851 }
1852
1853 pub fn len(&self) -> usize {
1855 self.inner.len()
1856 }
1857
1858 pub fn is_empty(&self) -> bool {
1860 self.inner.is_empty()
1861 }
1862
1863 pub fn size_in_bytes(&self) -> usize {
1865 self.inner.size_in_bytes()
1866 }
1867
1868 pub const fn provider(&self) -> &RocksDBProvider {
1870 self.provider
1871 }
1872
1873 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1877 self.inner
1878 }
1879
1880 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1885 self.provider.get::<T>(key)
1886 }
1887
1888 pub fn append_account_history_shard(
1900 &mut self,
1901 address: Address,
1902 indices: impl IntoIterator<Item = u64>,
1903 ) -> ProviderResult<()> {
1904 let indices: Vec<u64> = indices.into_iter().collect();
1905
1906 if indices.is_empty() {
1907 return Ok(());
1908 }
1909
1910 debug_assert!(
1911 indices.windows(2).all(|w| w[0] < w[1]),
1912 "indices must be strictly increasing: {:?}",
1913 indices
1914 );
1915
1916 let last_key = ShardedKey::new(address, u64::MAX);
1917 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1918 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1919
1920 last_shard.append(indices).map_err(ProviderError::other)?;
1921
1922 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1924 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1925 return Ok(());
1926 }
1927
1928 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1930 let mut chunks_peekable = chunks.into_iter().peekable();
1931
1932 while let Some(chunk) = chunks_peekable.next() {
1933 let shard = BlockNumberList::new_pre_sorted(chunk);
1934 let highest_block_number = if chunks_peekable.peek().is_some() {
1935 shard.iter().next_back().expect("`chunks` does not return empty list")
1936 } else {
1937 u64::MAX
1938 };
1939
1940 self.put::<tables::AccountsHistory>(
1941 ShardedKey::new(address, highest_block_number),
1942 &shard,
1943 )?;
1944 }
1945
1946 Ok(())
1947 }
1948
1949 pub fn append_storage_history_shard(
1961 &mut self,
1962 address: Address,
1963 storage_key: B256,
1964 indices: impl IntoIterator<Item = u64>,
1965 ) -> ProviderResult<()> {
1966 let indices: Vec<u64> = indices.into_iter().collect();
1967
1968 for (key, shard) in
1969 self.provider.storage_history_shards_to_put(address, storage_key, indices)?
1970 {
1971 self.put::<tables::StoragesHistory>(key, &shard)?;
1972 }
1973
1974 Ok(())
1975 }
1976
1977 pub fn unwind_account_history_to(
1984 &mut self,
1985 address: Address,
1986 keep_to: BlockNumber,
1987 ) -> ProviderResult<()> {
1988 let shards = self.provider.account_history_shards(address)?;
1989 if shards.is_empty() {
1990 return Ok(());
1991 }
1992
1993 let boundary_idx = shards.iter().position(|(key, _)| {
1996 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1997 });
1998
1999 let Some(boundary_idx) = boundary_idx else {
2001 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2002 if last_key.highest_block_number != u64::MAX {
2003 self.delete::<tables::AccountsHistory>(last_key.clone())?;
2004 self.put::<tables::AccountsHistory>(
2005 ShardedKey::new(address, u64::MAX),
2006 last_value,
2007 )?;
2008 }
2009 return Ok(());
2010 };
2011
2012 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2014 self.delete::<tables::AccountsHistory>(key.clone())?;
2015 }
2016
2017 let (boundary_key, boundary_list) = &shards[boundary_idx];
2019
2020 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
2022
2023 let new_last =
2025 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2026
2027 if new_last.is_empty() {
2028 if boundary_idx == 0 {
2031 return Ok(());
2033 }
2034
2035 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2036 if prev_key.highest_block_number != u64::MAX {
2037 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
2038 self.put::<tables::AccountsHistory>(
2039 ShardedKey::new(address, u64::MAX),
2040 prev_value,
2041 )?;
2042 }
2043 return Ok(());
2044 }
2045
2046 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
2047
2048 Ok(())
2049 }
2050
2051 #[expect(clippy::too_many_arguments)]
2057 fn prune_history_shards_inner<K>(
2058 &mut self,
2059 shards: Vec<(K, BlockNumberList)>,
2060 to_block: BlockNumber,
2061 get_highest: impl Fn(&K) -> u64,
2062 is_sentinel: impl Fn(&K) -> bool,
2063 delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
2064 put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
2065 create_sentinel: impl Fn() -> K,
2066 ) -> ProviderResult<PruneShardOutcome>
2067 where
2068 K: Clone,
2069 {
2070 if shards.is_empty() {
2071 return Ok(PruneShardOutcome::Unchanged);
2072 }
2073
2074 let mut deleted = false;
2075 let mut updated = false;
2076 let mut last_remaining: Option<(K, BlockNumberList)> = None;
2077
2078 for (key, block_list) in shards {
2079 if !is_sentinel(&key) && get_highest(&key) <= to_block {
2080 delete_shard(self, key)?;
2081 deleted = true;
2082 } else {
2083 let original_len = block_list.len();
2084 let filtered =
2085 BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
2086
2087 if filtered.is_empty() {
2088 delete_shard(self, key)?;
2089 deleted = true;
2090 } else if filtered.len() < original_len {
2091 put_shard(self, key.clone(), &filtered)?;
2092 last_remaining = Some((key, filtered));
2093 updated = true;
2094 } else {
2095 last_remaining = Some((key, block_list));
2096 }
2097 }
2098 }
2099
2100 if let Some((last_key, last_value)) = last_remaining &&
2101 !is_sentinel(&last_key)
2102 {
2103 delete_shard(self, last_key)?;
2104 put_shard(self, create_sentinel(), &last_value)?;
2105 updated = true;
2106 }
2107
2108 if deleted {
2109 Ok(PruneShardOutcome::Deleted)
2110 } else if updated {
2111 Ok(PruneShardOutcome::Updated)
2112 } else {
2113 Ok(PruneShardOutcome::Unchanged)
2114 }
2115 }
2116
2117 pub fn prune_account_history_to(
2122 &mut self,
2123 address: Address,
2124 to_block: BlockNumber,
2125 ) -> ProviderResult<PruneShardOutcome> {
2126 let shards = self.provider.account_history_shards(address)?;
2127 self.prune_history_shards_inner(
2128 shards,
2129 to_block,
2130 |key| key.highest_block_number,
2131 |key| key.highest_block_number == u64::MAX,
2132 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2133 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2134 || ShardedKey::new(address, u64::MAX),
2135 )
2136 }
2137
2138 pub fn prune_account_history_batch(
2147 &mut self,
2148 targets: &[(Address, BlockNumber)],
2149 ) -> ProviderResult<PrunedIndices> {
2150 if targets.is_empty() {
2151 return Ok(PrunedIndices::default());
2152 }
2153
2154 debug_assert!(
2155 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2156 "prune_account_history_batch: targets must be sorted by address"
2157 );
2158
2159 const PREFIX_LEN: usize = 20;
2162
2163 let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2164 let mut iter = self.provider.0.raw_iterator_cf(cf);
2165 let mut outcomes = PrunedIndices::default();
2166
2167 for (address, to_block) in targets {
2168 let start_key = ShardedKey::new(*address, 0u64).encode();
2170 let target_prefix = &start_key[..PREFIX_LEN];
2171
2172 let needs_seek = if iter.valid() {
2178 if let Some(current_key) = iter.key() {
2179 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2183 } else {
2184 true
2185 }
2186 } else {
2187 true
2188 };
2189
2190 if needs_seek {
2191 iter.seek(start_key);
2192 iter.status().map_err(|e| {
2193 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2194 message: e.to_string().into(),
2195 code: -1,
2196 }))
2197 })?;
2198 }
2199
2200 let mut shards = Vec::new();
2202 while iter.valid() {
2203 let Some(key_bytes) = iter.key() else { break };
2204
2205 let current_prefix = key_bytes.get(..PREFIX_LEN);
2207 if current_prefix != Some(target_prefix) {
2208 break;
2209 }
2210
2211 let key = ShardedKey::<Address>::decode(key_bytes)
2213 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2214
2215 let Some(value_bytes) = iter.value() else { break };
2216 let value = BlockNumberList::decompress(value_bytes)
2217 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2218
2219 shards.push((key, value));
2220 iter.next();
2221 }
2222
2223 match self.prune_history_shards_inner(
2224 shards,
2225 *to_block,
2226 |key| key.highest_block_number,
2227 |key| key.highest_block_number == u64::MAX,
2228 |batch, key| batch.delete::<tables::AccountsHistory>(key),
2229 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2230 || ShardedKey::new(*address, u64::MAX),
2231 )? {
2232 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2233 PruneShardOutcome::Updated => outcomes.updated += 1,
2234 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2235 }
2236 }
2237
2238 Ok(outcomes)
2239 }
2240
2241 pub fn prune_storage_history_to(
2247 &mut self,
2248 address: Address,
2249 storage_key: B256,
2250 to_block: BlockNumber,
2251 ) -> ProviderResult<PruneShardOutcome> {
2252 let shards = self.provider.storage_history_shards(address, storage_key)?;
2253 self.prune_history_shards_inner(
2254 shards,
2255 to_block,
2256 |key| key.sharded_key.highest_block_number,
2257 |key| key.sharded_key.highest_block_number == u64::MAX,
2258 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2259 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2260 || StorageShardedKey::last(address, storage_key),
2261 )
2262 }
2263
2264 pub fn prune_storage_history_batch(
2274 &mut self,
2275 targets: &[((Address, B256), BlockNumber)],
2276 ) -> ProviderResult<PrunedIndices> {
2277 if targets.is_empty() {
2278 return Ok(PrunedIndices::default());
2279 }
2280
2281 debug_assert!(
2282 targets.windows(2).all(|w| w[0].0 <= w[1].0),
2283 "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2284 );
2285
2286 const PREFIX_LEN: usize = 52;
2289
2290 let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2291 let mut iter = self.provider.0.raw_iterator_cf(cf);
2292 let mut outcomes = PrunedIndices::default();
2293
2294 for ((address, storage_key), to_block) in targets {
2295 let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2297 let target_prefix = &start_key[..PREFIX_LEN];
2298
2299 let needs_seek = if iter.valid() {
2305 if let Some(current_key) = iter.key() {
2306 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2310 } else {
2311 true
2312 }
2313 } else {
2314 true
2315 };
2316
2317 if needs_seek {
2318 iter.seek(start_key);
2319 iter.status().map_err(|e| {
2320 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2321 message: e.to_string().into(),
2322 code: -1,
2323 }))
2324 })?;
2325 }
2326
2327 let mut shards = Vec::new();
2329 while iter.valid() {
2330 let Some(key_bytes) = iter.key() else { break };
2331
2332 let current_prefix = key_bytes.get(..PREFIX_LEN);
2334 if current_prefix != Some(target_prefix) {
2335 break;
2336 }
2337
2338 let key = StorageShardedKey::decode(key_bytes)
2340 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2341
2342 let Some(value_bytes) = iter.value() else { break };
2343 let value = BlockNumberList::decompress(value_bytes)
2344 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2345
2346 shards.push((key, value));
2347 iter.next();
2348 }
2349
2350 match self.prune_history_shards_inner(
2352 shards,
2353 *to_block,
2354 |key| key.sharded_key.highest_block_number,
2355 |key| key.sharded_key.highest_block_number == u64::MAX,
2356 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2357 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2358 || StorageShardedKey::last(*address, *storage_key),
2359 )? {
2360 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2361 PruneShardOutcome::Updated => outcomes.updated += 1,
2362 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2363 }
2364 }
2365
2366 Ok(outcomes)
2367 }
2368
2369 pub fn unwind_storage_history_to(
2378 &mut self,
2379 address: Address,
2380 storage_key: B256,
2381 keep_to: BlockNumber,
2382 ) -> ProviderResult<()> {
2383 let shards = self.provider.storage_history_shards(address, storage_key)?;
2384 if shards.is_empty() {
2385 return Ok(());
2386 }
2387
2388 let boundary_idx = shards.iter().position(|(key, _)| {
2391 key.sharded_key.highest_block_number == u64::MAX ||
2392 key.sharded_key.highest_block_number > keep_to
2393 });
2394
2395 let Some(boundary_idx) = boundary_idx else {
2397 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2398 if last_key.sharded_key.highest_block_number != u64::MAX {
2399 self.delete::<tables::StoragesHistory>(last_key.clone())?;
2400 self.put::<tables::StoragesHistory>(
2401 StorageShardedKey::last(address, storage_key),
2402 last_value,
2403 )?;
2404 }
2405 return Ok(());
2406 };
2407
2408 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2410 self.delete::<tables::StoragesHistory>(key.clone())?;
2411 }
2412
2413 let (boundary_key, boundary_list) = &shards[boundary_idx];
2415
2416 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2418
2419 let new_last =
2421 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2422
2423 if new_last.is_empty() {
2424 if boundary_idx == 0 {
2427 return Ok(());
2429 }
2430
2431 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2432 if prev_key.sharded_key.highest_block_number != u64::MAX {
2433 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2434 self.put::<tables::StoragesHistory>(
2435 StorageShardedKey::last(address, storage_key),
2436 prev_value,
2437 )?;
2438 }
2439 return Ok(());
2440 }
2441
2442 self.put::<tables::StoragesHistory>(
2443 StorageShardedKey::last(address, storage_key),
2444 &new_last,
2445 )?;
2446
2447 Ok(())
2448 }
2449
2450 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2454 let shards = self.provider.account_history_shards(address)?;
2455 for (key, _) in shards {
2456 self.delete::<tables::AccountsHistory>(key)?;
2457 }
2458 Ok(())
2459 }
2460
2461 pub fn clear_storage_history(
2465 &mut self,
2466 address: Address,
2467 storage_key: B256,
2468 ) -> ProviderResult<()> {
2469 let shards = self.provider.storage_history_shards(address, storage_key)?;
2470 for (key, _) in shards {
2471 self.delete::<tables::StoragesHistory>(key)?;
2472 }
2473 Ok(())
2474 }
2475}
2476
2477pub struct RocksTx<'db> {
2487 inner: Transaction<'db, OptimisticTransactionDB>,
2488 provider: &'db RocksDBProvider,
2489}
2490
2491impl fmt::Debug for RocksTx<'_> {
2492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2493 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2494 }
2495}
2496
2497impl<'db> RocksTx<'db> {
2498 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2500 let encoded_key = key.encode();
2501 self.get_encoded::<T>(&encoded_key)
2502 }
2503
2504 pub fn get_encoded<T: Table>(
2506 &self,
2507 key: &<T::Key as Encode>::Encoded,
2508 ) -> ProviderResult<Option<T::Value>> {
2509 let cf = self.provider.get_cf_handle::<T>()?;
2510 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2511 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2512 message: e.to_string().into(),
2513 code: -1,
2514 }))
2515 })?;
2516
2517 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2518 }
2519
2520 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2522 let encoded_key = key.encode();
2523 self.put_encoded::<T>(&encoded_key, value)
2524 }
2525
2526 pub fn put_encoded<T: Table>(
2528 &self,
2529 key: &<T::Key as Encode>::Encoded,
2530 value: &T::Value,
2531 ) -> ProviderResult<()> {
2532 let cf = self.provider.get_cf_handle::<T>()?;
2533 let mut buf = Vec::new();
2534 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2535
2536 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2537 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2538 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2539 operation: DatabaseWriteOperation::PutUpsert,
2540 table_name: T::NAME,
2541 key: key.as_ref().to_vec(),
2542 })))
2543 })
2544 }
2545
2546 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2548 let cf = self.provider.get_cf_handle::<T>()?;
2549 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2550 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2551 message: e.to_string().into(),
2552 code: -1,
2553 }))
2554 })
2555 }
2556
2557 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2561 let cf = self.provider.get_cf_handle::<T>()?;
2562 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2563 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2564 }
2565
2566 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2568 let cf = self.provider.get_cf_handle::<T>()?;
2569 let encoded_key = key.encode();
2570 let iter = self
2571 .inner
2572 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2573 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2574 }
2575
2576 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2578 pub fn commit(self) -> ProviderResult<()> {
2579 self.inner.commit().map_err(|e| {
2580 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2581 message: e.to_string().into(),
2582 code: -1,
2583 }))
2584 })
2585 }
2586
2587 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2589 pub fn rollback(self) -> ProviderResult<()> {
2590 self.inner.rollback().map_err(|e| {
2591 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2592 })
2593 }
2594}
2595
2596enum RocksDBIterEnum<'db> {
2598 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2600 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2602}
2603
2604impl Iterator for RocksDBIterEnum<'_> {
2605 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2606
2607 fn next(&mut self) -> Option<Self::Item> {
2608 match self {
2609 Self::ReadWrite(iter) => iter.next(),
2610 Self::ReadOnly(iter) => iter.next(),
2611 }
2612 }
2613}
2614
2615enum RocksDBRawIterEnum<'db> {
2620 ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2622 ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2624}
2625
2626impl RocksDBRawIterEnum<'_> {
2627 fn seek(&mut self, key: impl AsRef<[u8]>) {
2629 match self {
2630 Self::ReadWrite(iter) => iter.seek(key),
2631 Self::ReadOnly(iter) => iter.seek(key),
2632 }
2633 }
2634
2635 fn valid(&self) -> bool {
2637 match self {
2638 Self::ReadWrite(iter) => iter.valid(),
2639 Self::ReadOnly(iter) => iter.valid(),
2640 }
2641 }
2642
2643 fn key(&self) -> Option<&[u8]> {
2645 match self {
2646 Self::ReadWrite(iter) => iter.key(),
2647 Self::ReadOnly(iter) => iter.key(),
2648 }
2649 }
2650
2651 fn value(&self) -> Option<&[u8]> {
2653 match self {
2654 Self::ReadWrite(iter) => iter.value(),
2655 Self::ReadOnly(iter) => iter.value(),
2656 }
2657 }
2658
2659 fn next(&mut self) {
2661 match self {
2662 Self::ReadWrite(iter) => iter.next(),
2663 Self::ReadOnly(iter) => iter.next(),
2664 }
2665 }
2666
2667 fn prev(&mut self) {
2669 match self {
2670 Self::ReadWrite(iter) => iter.prev(),
2671 Self::ReadOnly(iter) => iter.prev(),
2672 }
2673 }
2674
2675 fn status(&self) -> Result<(), rocksdb::Error> {
2677 match self {
2678 Self::ReadWrite(iter) => iter.status(),
2679 Self::ReadOnly(iter) => iter.status(),
2680 }
2681 }
2682}
2683
2684pub struct RocksDBIter<'db, T: Table> {
2688 inner: RocksDBIterEnum<'db>,
2689 _marker: std::marker::PhantomData<T>,
2690}
2691
2692impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2693 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2694 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2695 }
2696}
2697
2698impl<T: Table> Iterator for RocksDBIter<'_, T> {
2699 type Item = ProviderResult<(T::Key, T::Value)>;
2700
2701 fn next(&mut self) -> Option<Self::Item> {
2702 Some(decode_iter_item::<T>(self.inner.next()?))
2703 }
2704}
2705
2706pub struct RocksDBRawIter<'db> {
2710 inner: RocksDBIterEnum<'db>,
2711}
2712
2713impl fmt::Debug for RocksDBRawIter<'_> {
2714 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2715 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2716 }
2717}
2718
2719impl Iterator for RocksDBRawIter<'_> {
2720 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2721
2722 fn next(&mut self) -> Option<Self::Item> {
2723 match self.inner.next()? {
2724 Ok(kv) => Some(Ok(kv)),
2725 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2726 message: e.to_string().into(),
2727 code: -1,
2728 })))),
2729 }
2730 }
2731}
2732
2733pub struct RocksTxIter<'tx, T: Table> {
2737 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2738 _marker: std::marker::PhantomData<T>,
2739}
2740
2741impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2742 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2743 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2744 }
2745}
2746
2747impl<T: Table> Iterator for RocksTxIter<'_, T> {
2748 type Item = ProviderResult<(T::Key, T::Value)>;
2749
2750 fn next(&mut self) -> Option<Self::Item> {
2751 Some(decode_iter_item::<T>(self.inner.next()?))
2752 }
2753}
2754
2755fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2760 let (key_bytes, value_bytes) = result.map_err(|e| {
2761 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2762 message: e.to_string().into(),
2763 code: -1,
2764 }))
2765 })?;
2766
2767 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2768 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2769
2770 let value = T::Value::decompress(&value_bytes)
2771 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2772
2773 Ok((key, value))
2774}
2775
2776const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2778 match level {
2779 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2780 LogLevel::Error => rocksdb::LogLevel::Error,
2781 LogLevel::Warn => rocksdb::LogLevel::Warn,
2782 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2783 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2784 }
2785}
2786
2787#[cfg(test)]
2788mod tests {
2789 use super::*;
2790 use crate::providers::HistoryInfo;
2791 use alloy_primitives::{Address, TxHash, B256};
2792 use reth_db_api::{
2793 models::{
2794 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2795 storage_sharded_key::StorageShardedKey,
2796 IntegerList,
2797 },
2798 table::Table,
2799 tables,
2800 };
2801 use tempfile::TempDir;
2802
2803 #[test]
2804 fn test_with_default_tables_registers_required_column_families() {
2805 let temp_dir = TempDir::new().unwrap();
2806
2807 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2809
2810 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2812 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2813 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2814
2815 let key = ShardedKey::new(Address::ZERO, 100);
2817 let value = IntegerList::default();
2818 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2819 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2820
2821 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2823 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2824 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2825 }
2826
2827 #[derive(Debug)]
2828 struct TestTable;
2829
2830 impl Table for TestTable {
2831 const NAME: &'static str = "TestTable";
2832 const DUPSORT: bool = false;
2833 type Key = u64;
2834 type Value = Vec<u8>;
2835 }
2836
2837 #[test]
2838 fn test_basic_operations() {
2839 let temp_dir = TempDir::new().unwrap();
2840
2841 let provider = RocksDBBuilder::new(temp_dir.path())
2842 .with_table::<TestTable>() .build()
2844 .unwrap();
2845
2846 let key = 42u64;
2847 let value = b"test_value".to_vec();
2848
2849 provider.put::<TestTable>(key, &value).unwrap();
2851
2852 let result = provider.get::<TestTable>(key).unwrap();
2854 assert_eq!(result, Some(value));
2855
2856 provider.delete::<TestTable>(key).unwrap();
2858
2859 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2861 }
2862
2863 #[test]
2864 fn test_batch_operations() {
2865 let temp_dir = TempDir::new().unwrap();
2866 let provider =
2867 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2868
2869 provider
2871 .write_batch(|batch| {
2872 for i in 0..10u64 {
2873 let value = format!("value_{i}").into_bytes();
2874 batch.put::<TestTable>(i, &value)?;
2875 }
2876 Ok(())
2877 })
2878 .unwrap();
2879
2880 for i in 0..10u64 {
2882 let value = format!("value_{i}").into_bytes();
2883 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2884 }
2885
2886 provider
2888 .write_batch(|batch| {
2889 for i in 0..10u64 {
2890 batch.delete::<TestTable>(i)?;
2891 }
2892 Ok(())
2893 })
2894 .unwrap();
2895
2896 for i in 0..10u64 {
2898 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2899 }
2900 }
2901
2902 #[test]
2903 fn test_with_real_table() {
2904 let temp_dir = TempDir::new().unwrap();
2905 let provider = RocksDBBuilder::new(temp_dir.path())
2906 .with_table::<tables::TransactionHashNumbers>()
2907 .with_metrics()
2908 .build()
2909 .unwrap();
2910
2911 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2912
2913 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2915 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2916
2917 provider
2919 .write_batch(|batch| {
2920 for i in 0..10u64 {
2921 let hash = TxHash::from(B256::from([i as u8; 32]));
2922 let value = i * 100;
2923 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2924 }
2925 Ok(())
2926 })
2927 .unwrap();
2928
2929 for i in 0..10u64 {
2931 let hash = TxHash::from(B256::from([i as u8; 32]));
2932 assert_eq!(
2933 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2934 Some(i * 100)
2935 );
2936 }
2937 }
2938 #[test]
2939 fn test_statistics_enabled() {
2940 let temp_dir = TempDir::new().unwrap();
2941 let provider = RocksDBBuilder::new(temp_dir.path())
2943 .with_table::<TestTable>()
2944 .with_statistics()
2945 .build()
2946 .unwrap();
2947
2948 for i in 0..10 {
2950 let value = vec![i as u8];
2951 provider.put::<TestTable>(i, &value).unwrap();
2952 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2954 }
2955 }
2956
2957 #[test]
2958 fn test_data_persistence() {
2959 let temp_dir = TempDir::new().unwrap();
2960 let provider =
2961 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2962
2963 let value = vec![42u8; 1000];
2965 for i in 0..100 {
2966 provider.put::<TestTable>(i, &value).unwrap();
2967 }
2968
2969 for i in 0..100 {
2971 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2972 }
2973 }
2974
2975 #[test]
2976 fn test_transaction_read_your_writes() {
2977 let temp_dir = TempDir::new().unwrap();
2978 let provider =
2979 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2980
2981 let tx = provider.tx();
2983
2984 let key = 42u64;
2986 let value = b"test_value".to_vec();
2987 tx.put::<TestTable>(key, &value).unwrap();
2988
2989 let result = tx.get::<TestTable>(key).unwrap();
2991 assert_eq!(
2992 result,
2993 Some(value.clone()),
2994 "Transaction should see its own uncommitted writes"
2995 );
2996
2997 let provider_result = provider.get::<TestTable>(key).unwrap();
2999 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
3000
3001 tx.commit().unwrap();
3003
3004 let committed_result = provider.get::<TestTable>(key).unwrap();
3006 assert_eq!(committed_result, Some(value), "Committed data should be visible");
3007 }
3008
3009 #[test]
3010 fn test_transaction_rollback() {
3011 let temp_dir = TempDir::new().unwrap();
3012 let provider =
3013 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3014
3015 let key = 100u64;
3017 let initial_value = b"initial".to_vec();
3018 provider.put::<TestTable>(key, &initial_value).unwrap();
3019
3020 let tx = provider.tx();
3022 let new_value = b"modified".to_vec();
3023 tx.put::<TestTable>(key, &new_value).unwrap();
3024
3025 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
3027
3028 tx.rollback().unwrap();
3030
3031 let result = provider.get::<TestTable>(key).unwrap();
3033 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
3034 }
3035
3036 #[test]
3037 fn test_transaction_iterator() {
3038 let temp_dir = TempDir::new().unwrap();
3039 let provider =
3040 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3041
3042 let tx = provider.tx();
3044
3045 for i in 0..5u64 {
3047 let value = format!("value_{i}").into_bytes();
3048 tx.put::<TestTable>(i, &value).unwrap();
3049 }
3050
3051 let mut count = 0;
3053 for result in tx.iter::<TestTable>().unwrap() {
3054 let (key, value) = result.unwrap();
3055 assert_eq!(value, format!("value_{key}").into_bytes());
3056 count += 1;
3057 }
3058 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
3059
3060 tx.commit().unwrap();
3062 }
3063
3064 #[test]
3065 fn test_batch_manual_commit() {
3066 let temp_dir = TempDir::new().unwrap();
3067 let provider =
3068 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3069
3070 let mut batch = provider.batch();
3072
3073 for i in 0..10u64 {
3075 let value = format!("batch_value_{i}").into_bytes();
3076 batch.put::<TestTable>(i, &value).unwrap();
3077 }
3078
3079 assert_eq!(batch.len(), 10);
3081 assert!(!batch.is_empty());
3082
3083 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
3085
3086 batch.commit().unwrap();
3088
3089 for i in 0..10u64 {
3091 let value = format!("batch_value_{i}").into_bytes();
3092 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3093 }
3094 }
3095
3096 #[test]
3097 fn test_first_and_last_entry() {
3098 let temp_dir = TempDir::new().unwrap();
3099 let provider =
3100 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3101
3102 assert_eq!(provider.first::<TestTable>().unwrap(), None);
3104 assert_eq!(provider.last::<TestTable>().unwrap(), None);
3105
3106 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
3108 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
3109 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
3110
3111 let first = provider.first::<TestTable>().unwrap();
3113 assert_eq!(first, Some((5, b"value_5".to_vec())));
3114
3115 let last = provider.last::<TestTable>().unwrap();
3117 assert_eq!(last, Some((20, b"value_20".to_vec())));
3118 }
3119
3120 #[test]
3124 fn test_account_history_info_pruned_before_first_entry() {
3125 let temp_dir = TempDir::new().unwrap();
3126 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3127
3128 let address = Address::from([0x42; 20]);
3129
3130 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3132 let shard_key = ShardedKey::new(address, u64::MAX);
3133 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3134
3135 let result =
3140 provider.snapshot().account_history_info(address, 50, Some(100), u64::MAX).unwrap();
3141 assert_eq!(result, HistoryInfo::InChangeset(100));
3142 }
3143
3144 #[test]
3146 fn test_account_history_info_read_only_and_catch_up() {
3147 let temp_dir = TempDir::new().unwrap();
3148 let address = Address::from([0x42; 20]);
3149 let chunk = IntegerList::new([100, 200, 300]).unwrap();
3150 let shard_key = ShardedKey::new(address, u64::MAX);
3151
3152 let rw_provider =
3154 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3155 rw_provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3156
3157 let ro_provider = RocksDBBuilder::new(temp_dir.path())
3159 .with_default_tables()
3160 .with_read_only(true)
3161 .build()
3162 .unwrap();
3163
3164 let result =
3165 ro_provider.snapshot().account_history_info(address, 200, None, u64::MAX).unwrap();
3166 assert_eq!(result, HistoryInfo::InChangeset(200));
3167
3168 let result =
3169 ro_provider.snapshot().account_history_info(address, 50, None, u64::MAX).unwrap();
3170 assert_eq!(result, HistoryInfo::NotYetWritten);
3171
3172 let result =
3173 ro_provider.snapshot().account_history_info(address, 400, None, u64::MAX).unwrap();
3174 assert_eq!(result, HistoryInfo::InPlainState);
3175
3176 let address2 = Address::from([0x43; 20]);
3178 let chunk2 = IntegerList::new([500, 600]).unwrap();
3179 let shard_key2 = ShardedKey::new(address2, u64::MAX);
3180 rw_provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
3181
3182 let result =
3184 ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3185 assert_eq!(result, HistoryInfo::NotYetWritten);
3186
3187 ro_provider.try_catch_up_with_primary().unwrap();
3189
3190 let result =
3191 ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3192 assert_eq!(result, HistoryInfo::InChangeset(500));
3193 }
3194
3195 #[test]
3196 fn test_account_history_info_ignores_blocks_above_visible_tip() {
3197 let temp_dir = TempDir::new().unwrap();
3198 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3199
3200 let address = Address::from([0x42; 20]);
3201
3202 provider
3203 .put::<tables::AccountsHistory>(
3204 ShardedKey::new(address, 110),
3205 &IntegerList::new([100, 110]).unwrap(),
3206 )
3207 .unwrap();
3208 provider
3209 .put::<tables::AccountsHistory>(
3210 ShardedKey::new(address, u64::MAX),
3211 &IntegerList::new([200, 210]).unwrap(),
3212 )
3213 .unwrap();
3214
3215 let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3216 assert_eq!(result, HistoryInfo::InPlainState);
3217 }
3218
3219 #[test]
3220 fn test_account_history_info_mixed_shard_respects_visible_tip() {
3221 let temp_dir = TempDir::new().unwrap();
3222 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3223
3224 let address = Address::from([0x42; 20]);
3225 provider
3226 .put::<tables::AccountsHistory>(
3227 ShardedKey::new(address, u64::MAX),
3228 &IntegerList::new([100, 150, 300]).unwrap(),
3229 )
3230 .unwrap();
3231
3232 let result = provider.snapshot().account_history_info(address, 120, None, 200).unwrap();
3233 assert_eq!(result, HistoryInfo::InChangeset(150));
3234
3235 let result = provider.snapshot().account_history_info(address, 201, None, 200).unwrap();
3236 assert_eq!(result, HistoryInfo::InPlainState);
3237 }
3238
3239 #[test]
3240 fn test_account_history_info_only_stale_entries_use_fallback() {
3241 let temp_dir = TempDir::new().unwrap();
3242 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3243
3244 let address = Address::from([0x42; 20]);
3245 provider
3246 .put::<tables::AccountsHistory>(
3247 ShardedKey::new(address, u64::MAX),
3248 &IntegerList::new([200, 210]).unwrap(),
3249 )
3250 .unwrap();
3251
3252 let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3253 assert_eq!(result, HistoryInfo::NotYetWritten);
3254
3255 let result =
3256 provider.snapshot().account_history_info(address, 150, Some(100), 150).unwrap();
3257 assert_eq!(result, HistoryInfo::MaybeInPlainState);
3258 }
3259
3260 #[test]
3261 fn test_account_history_shard_split_at_boundary() {
3262 let temp_dir = TempDir::new().unwrap();
3263 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3264
3265 let address = Address::from([0x42; 20]);
3266 let limit = NUM_OF_INDICES_IN_SHARD;
3267
3268 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3270 let mut batch = provider.batch();
3271 batch.append_account_history_shard(address, indices).unwrap();
3272 batch.commit().unwrap();
3273
3274 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3276 let sentinel_key = ShardedKey::new(address, u64::MAX);
3277
3278 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3279 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3280
3281 assert!(completed_shard.is_some(), "completed shard should exist");
3282 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3283
3284 let completed_shard = completed_shard.unwrap();
3285 let sentinel_shard = sentinel_shard.unwrap();
3286
3287 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3288 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3289 }
3290
3291 #[test]
3292 fn test_account_history_multiple_shard_splits() {
3293 let temp_dir = TempDir::new().unwrap();
3294 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3295
3296 let address = Address::from([0x43; 20]);
3297 let limit = NUM_OF_INDICES_IN_SHARD;
3298
3299 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3301 let mut batch = provider.batch();
3302 batch.append_account_history_shard(address, first_batch_indices).unwrap();
3303 batch.commit().unwrap();
3304
3305 let sentinel_key = ShardedKey::new(address, u64::MAX);
3307 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3308 assert!(shard.is_some());
3309 assert_eq!(shard.unwrap().len(), limit as u64);
3310
3311 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3313 let mut batch = provider.batch();
3314 batch.append_account_history_shard(address, second_batch_indices).unwrap();
3315 batch.commit().unwrap();
3316
3317 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3319 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3320
3321 assert!(
3322 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3323 "first completed shard should exist"
3324 );
3325 assert!(
3326 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3327 "second completed shard should exist"
3328 );
3329 assert!(
3330 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3331 "sentinel shard should exist"
3332 );
3333 }
3334
3335 #[test]
3336 fn test_storage_history_shard_split_at_boundary() {
3337 let temp_dir = TempDir::new().unwrap();
3338 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3339
3340 let address = Address::from([0x44; 20]);
3341 let slot = B256::from([0x55; 32]);
3342 let limit = NUM_OF_INDICES_IN_SHARD;
3343
3344 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3346 let mut batch = provider.batch();
3347 batch.append_storage_history_shard(address, slot, indices).unwrap();
3348 batch.commit().unwrap();
3349
3350 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3352 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3353
3354 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3355 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3356
3357 assert!(completed_shard.is_some(), "completed shard should exist");
3358 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3359
3360 let completed_shard = completed_shard.unwrap();
3361 let sentinel_shard = sentinel_shard.unwrap();
3362
3363 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3364 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3365 }
3366
3367 #[test]
3368 fn test_storage_history_multiple_shard_splits() {
3369 let temp_dir = TempDir::new().unwrap();
3370 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3371
3372 let address = Address::from([0x46; 20]);
3373 let slot = B256::from([0x57; 32]);
3374 let limit = NUM_OF_INDICES_IN_SHARD;
3375
3376 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3378 let mut batch = provider.batch();
3379 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3380 batch.commit().unwrap();
3381
3382 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3384 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3385 assert!(shard.is_some());
3386 assert_eq!(shard.unwrap().len(), limit as u64);
3387
3388 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3390 let mut batch = provider.batch();
3391 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3392 batch.commit().unwrap();
3393
3394 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3396 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3397
3398 assert!(
3399 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3400 "first completed shard should exist"
3401 );
3402 assert!(
3403 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3404 "second completed shard should exist"
3405 );
3406 assert!(
3407 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3408 "sentinel shard should exist"
3409 );
3410 }
3411
3412 #[test]
3413 fn test_clear_table() {
3414 let temp_dir = TempDir::new().unwrap();
3415 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3416
3417 let address = Address::from([0x42; 20]);
3418 let key = ShardedKey::new(address, u64::MAX);
3419 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3420
3421 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3422 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3423
3424 provider.clear::<tables::AccountsHistory>().unwrap();
3425
3426 assert!(
3427 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3428 "table should be empty after clear"
3429 );
3430 assert!(
3431 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3432 "first() should return None after clear"
3433 );
3434 }
3435
3436 #[test]
3437 fn test_clear_empty_table() {
3438 let temp_dir = TempDir::new().unwrap();
3439 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3440
3441 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3442
3443 provider.clear::<tables::AccountsHistory>().unwrap();
3444
3445 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3446 }
3447
3448 #[test]
3449 fn test_unwind_account_history_to_basic() {
3450 let temp_dir = TempDir::new().unwrap();
3451 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3452
3453 let address = Address::from([0x42; 20]);
3454
3455 let mut batch = provider.batch();
3457 batch.append_account_history_shard(address, 0..=10).unwrap();
3458 batch.commit().unwrap();
3459
3460 let key = ShardedKey::new(address, u64::MAX);
3462 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3463 assert!(result.is_some());
3464 let blocks: Vec<u64> = result.unwrap().iter().collect();
3465 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3466
3467 let mut batch = provider.batch();
3469 batch.unwind_account_history_to(address, 5).unwrap();
3470 batch.commit().unwrap();
3471
3472 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3474 assert!(result.is_some());
3475 let blocks: Vec<u64> = result.unwrap().iter().collect();
3476 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3477 }
3478
3479 #[test]
3480 fn test_unwind_account_history_to_removes_all() {
3481 let temp_dir = TempDir::new().unwrap();
3482 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3483
3484 let address = Address::from([0x42; 20]);
3485
3486 let mut batch = provider.batch();
3488 batch.append_account_history_shard(address, 5..=10).unwrap();
3489 batch.commit().unwrap();
3490
3491 let mut batch = provider.batch();
3493 batch.unwind_account_history_to(address, 4).unwrap();
3494 batch.commit().unwrap();
3495
3496 let key = ShardedKey::new(address, u64::MAX);
3498 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3499 assert!(result.is_none(), "Should have no data after full unwind");
3500 }
3501
3502 #[test]
3503 fn test_unwind_account_history_to_no_op() {
3504 let temp_dir = TempDir::new().unwrap();
3505 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3506
3507 let address = Address::from([0x42; 20]);
3508
3509 let mut batch = provider.batch();
3511 batch.append_account_history_shard(address, 0..=5).unwrap();
3512 batch.commit().unwrap();
3513
3514 let mut batch = provider.batch();
3516 batch.unwind_account_history_to(address, 10).unwrap();
3517 batch.commit().unwrap();
3518
3519 let key = ShardedKey::new(address, u64::MAX);
3521 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3522 assert!(result.is_some());
3523 let blocks: Vec<u64> = result.unwrap().iter().collect();
3524 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3525 }
3526
3527 #[test]
3528 fn test_unwind_account_history_to_block_zero() {
3529 let temp_dir = TempDir::new().unwrap();
3530 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3531
3532 let address = Address::from([0x42; 20]);
3533
3534 let mut batch = provider.batch();
3536 batch.append_account_history_shard(address, 0..=5).unwrap();
3537 batch.commit().unwrap();
3538
3539 let mut batch = provider.batch();
3542 batch.unwind_account_history_to(address, 0).unwrap();
3543 batch.commit().unwrap();
3544
3545 let key = ShardedKey::new(address, u64::MAX);
3547 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3548 assert!(result.is_some());
3549 let blocks: Vec<u64> = result.unwrap().iter().collect();
3550 assert_eq!(blocks, vec![0]);
3551 }
3552
3553 #[test]
3554 fn test_unwind_account_history_to_multi_shard() {
3555 let temp_dir = TempDir::new().unwrap();
3556 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3557
3558 let address = Address::from([0x42; 20]);
3559
3560 let mut batch = provider.batch();
3563
3564 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3566 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3567
3568 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3570 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3571
3572 batch.commit().unwrap();
3573
3574 let shards = provider.account_history_shards(address).unwrap();
3576 assert_eq!(shards.len(), 2);
3577
3578 let mut batch = provider.batch();
3580 batch.unwind_account_history_to(address, 75).unwrap();
3581 batch.commit().unwrap();
3582
3583 let shards = provider.account_history_shards(address).unwrap();
3585 assert_eq!(shards.len(), 2);
3586
3587 assert_eq!(shards[0].0.highest_block_number, 50);
3589 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3590
3591 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3593 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3594 }
3595
3596 #[test]
3597 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3598 let temp_dir = TempDir::new().unwrap();
3599 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3600
3601 let address = Address::from([0x42; 20]);
3602
3603 let mut batch = provider.batch();
3605
3606 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3608 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3609
3610 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3612 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3613
3614 batch.commit().unwrap();
3615
3616 let mut batch = provider.batch();
3618 batch.unwind_account_history_to(address, 60).unwrap();
3619 batch.commit().unwrap();
3620
3621 let shards = provider.account_history_shards(address).unwrap();
3623 assert_eq!(shards.len(), 1);
3624 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3625 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3626 }
3627
3628 #[test]
3629 fn test_account_history_shards_iterator() {
3630 let temp_dir = TempDir::new().unwrap();
3631 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3632
3633 let address = Address::from([0x42; 20]);
3634 let other_address = Address::from([0x43; 20]);
3635
3636 let mut batch = provider.batch();
3638 batch.append_account_history_shard(address, 0..=5).unwrap();
3639 batch.append_account_history_shard(other_address, 10..=15).unwrap();
3640 batch.commit().unwrap();
3641
3642 let shards = provider.account_history_shards(address).unwrap();
3644 assert_eq!(shards.len(), 1);
3645 assert_eq!(shards[0].0.key, address);
3646
3647 let shards = provider.account_history_shards(other_address).unwrap();
3649 assert_eq!(shards.len(), 1);
3650 assert_eq!(shards[0].0.key, other_address);
3651
3652 let non_existent = Address::from([0x99; 20]);
3654 let shards = provider.account_history_shards(non_existent).unwrap();
3655 assert!(shards.is_empty());
3656 }
3657
3658 #[test]
3659 fn test_clear_account_history() {
3660 let temp_dir = TempDir::new().unwrap();
3661 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3662
3663 let address = Address::from([0x42; 20]);
3664
3665 let mut batch = provider.batch();
3667 batch.append_account_history_shard(address, 0..=10).unwrap();
3668 batch.commit().unwrap();
3669
3670 let mut batch = provider.batch();
3672 batch.clear_account_history(address).unwrap();
3673 batch.commit().unwrap();
3674
3675 let shards = provider.account_history_shards(address).unwrap();
3677 assert!(shards.is_empty(), "All shards should be deleted");
3678 }
3679
3680 #[test]
3681 fn test_unwind_non_sentinel_boundary() {
3682 let temp_dir = TempDir::new().unwrap();
3683 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3684
3685 let address = Address::from([0x42; 20]);
3686
3687 let mut batch = provider.batch();
3689
3690 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3692 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3693
3694 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3696 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3697
3698 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3700 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3701
3702 batch.commit().unwrap();
3703
3704 let shards = provider.account_history_shards(address).unwrap();
3706 assert_eq!(shards.len(), 3);
3707
3708 let mut batch = provider.batch();
3710 batch.unwind_account_history_to(address, 75).unwrap();
3711 batch.commit().unwrap();
3712
3713 let shards = provider.account_history_shards(address).unwrap();
3715 assert_eq!(shards.len(), 2);
3716
3717 assert_eq!(shards[0].0.highest_block_number, 50);
3719 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3720
3721 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3723 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3724 }
3725
3726 #[test]
3727 fn test_batch_auto_commit_on_threshold() {
3728 let temp_dir = TempDir::new().unwrap();
3729 let provider =
3730 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3731
3732 let mut batch = RocksDBBatch {
3734 provider: &provider,
3735 inner: WriteBatchWithTransaction::<true>::default(),
3736 buf: Vec::new(),
3737 auto_commit_threshold: Some(1024), };
3739
3740 for i in 0..100u64 {
3743 let value = format!("value_{i:04}").into_bytes();
3744 batch.put::<TestTable>(i, &value).unwrap();
3745 }
3746
3747 let first_visible = provider.get::<TestTable>(0).unwrap();
3750 assert!(first_visible.is_some(), "Auto-committed data should be visible");
3751
3752 batch.commit().unwrap();
3754
3755 for i in 0..100u64 {
3757 let value = format!("value_{i:04}").into_bytes();
3758 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3759 }
3760 }
3761
3762 struct AccountPruneCase {
3766 name: &'static str,
3767 initial_shards: &'static [(u64, &'static [u64])],
3768 prune_to: u64,
3769 expected_outcome: PruneShardOutcome,
3770 expected_shards: &'static [(u64, &'static [u64])],
3771 }
3772
3773 struct StoragePruneCase {
3775 name: &'static str,
3776 initial_shards: &'static [(u64, &'static [u64])],
3777 prune_to: u64,
3778 expected_outcome: PruneShardOutcome,
3779 expected_shards: &'static [(u64, &'static [u64])],
3780 }
3781
3782 #[test]
3783 fn test_prune_account_history_cases() {
3784 const MAX: u64 = u64::MAX;
3785 const CASES: &[AccountPruneCase] = &[
3786 AccountPruneCase {
3787 name: "single_shard_truncate",
3788 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3789 prune_to: 25,
3790 expected_outcome: PruneShardOutcome::Updated,
3791 expected_shards: &[(MAX, &[30, 40])],
3792 },
3793 AccountPruneCase {
3794 name: "single_shard_delete_all",
3795 initial_shards: &[(MAX, &[10, 20])],
3796 prune_to: 20,
3797 expected_outcome: PruneShardOutcome::Deleted,
3798 expected_shards: &[],
3799 },
3800 AccountPruneCase {
3801 name: "single_shard_noop",
3802 initial_shards: &[(MAX, &[10, 20])],
3803 prune_to: 5,
3804 expected_outcome: PruneShardOutcome::Unchanged,
3805 expected_shards: &[(MAX, &[10, 20])],
3806 },
3807 AccountPruneCase {
3808 name: "no_shards",
3809 initial_shards: &[],
3810 prune_to: 100,
3811 expected_outcome: PruneShardOutcome::Unchanged,
3812 expected_shards: &[],
3813 },
3814 AccountPruneCase {
3815 name: "multi_shard_truncate_first",
3816 initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3817 prune_to: 25,
3818 expected_outcome: PruneShardOutcome::Updated,
3819 expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3820 },
3821 AccountPruneCase {
3822 name: "delete_first_shard_sentinel_unchanged",
3823 initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3824 prune_to: 20,
3825 expected_outcome: PruneShardOutcome::Deleted,
3826 expected_shards: &[(MAX, &[30, 40])],
3827 },
3828 AccountPruneCase {
3829 name: "multi_shard_delete_all_but_last",
3830 initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3831 prune_to: 22,
3832 expected_outcome: PruneShardOutcome::Deleted,
3833 expected_shards: &[(MAX, &[25, 30])],
3834 },
3835 AccountPruneCase {
3836 name: "mid_shard_preserves_key",
3837 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3838 prune_to: 25,
3839 expected_outcome: PruneShardOutcome::Updated,
3840 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3841 },
3842 AccountPruneCase {
3844 name: "equiv_delete_early_shards_keep_sentinel",
3845 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3846 prune_to: 55,
3847 expected_outcome: PruneShardOutcome::Deleted,
3848 expected_shards: &[(MAX, &[60, 70])],
3849 },
3850 AccountPruneCase {
3851 name: "equiv_sentinel_becomes_empty_with_prev",
3852 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3853 prune_to: 40,
3854 expected_outcome: PruneShardOutcome::Deleted,
3855 expected_shards: &[(MAX, &[50])],
3856 },
3857 AccountPruneCase {
3858 name: "equiv_all_shards_become_empty",
3859 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3860 prune_to: 51,
3861 expected_outcome: PruneShardOutcome::Deleted,
3862 expected_shards: &[],
3863 },
3864 AccountPruneCase {
3865 name: "equiv_non_sentinel_last_shard_promoted",
3866 initial_shards: &[(100, &[50, 75, 100])],
3867 prune_to: 60,
3868 expected_outcome: PruneShardOutcome::Updated,
3869 expected_shards: &[(MAX, &[75, 100])],
3870 },
3871 AccountPruneCase {
3872 name: "equiv_filter_within_shard",
3873 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3874 prune_to: 25,
3875 expected_outcome: PruneShardOutcome::Updated,
3876 expected_shards: &[(MAX, &[30, 40])],
3877 },
3878 AccountPruneCase {
3879 name: "equiv_multi_shard_partial_delete",
3880 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3881 prune_to: 35,
3882 expected_outcome: PruneShardOutcome::Deleted,
3883 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3884 },
3885 ];
3886
3887 let address = Address::from([0x42; 20]);
3888
3889 for case in CASES {
3890 let temp_dir = TempDir::new().unwrap();
3891 let provider =
3892 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3893
3894 let mut batch = provider.batch();
3896 for (highest, blocks) in case.initial_shards {
3897 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3898 batch
3899 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3900 .unwrap();
3901 }
3902 batch.commit().unwrap();
3903
3904 let mut batch = provider.batch();
3906 let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3907 batch.commit().unwrap();
3908
3909 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3911
3912 let shards = provider.account_history_shards(address).unwrap();
3914 assert_eq!(
3915 shards.len(),
3916 case.expected_shards.len(),
3917 "case '{}': wrong shard count",
3918 case.name
3919 );
3920 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3921 shards.iter().zip(case.expected_shards.iter()).enumerate()
3922 {
3923 assert_eq!(
3924 key.highest_block_number, *exp_key,
3925 "case '{}': shard {} wrong key",
3926 case.name, i
3927 );
3928 assert_eq!(
3929 blocks.iter().collect::<Vec<_>>(),
3930 *exp_blocks,
3931 "case '{}': shard {} wrong blocks",
3932 case.name,
3933 i
3934 );
3935 }
3936 }
3937 }
3938
3939 #[test]
3940 fn test_prune_storage_history_cases() {
3941 const MAX: u64 = u64::MAX;
3942 const CASES: &[StoragePruneCase] = &[
3943 StoragePruneCase {
3944 name: "single_shard_truncate",
3945 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3946 prune_to: 25,
3947 expected_outcome: PruneShardOutcome::Updated,
3948 expected_shards: &[(MAX, &[30, 40])],
3949 },
3950 StoragePruneCase {
3951 name: "single_shard_delete_all",
3952 initial_shards: &[(MAX, &[10, 20])],
3953 prune_to: 20,
3954 expected_outcome: PruneShardOutcome::Deleted,
3955 expected_shards: &[],
3956 },
3957 StoragePruneCase {
3958 name: "noop",
3959 initial_shards: &[(MAX, &[10, 20])],
3960 prune_to: 5,
3961 expected_outcome: PruneShardOutcome::Unchanged,
3962 expected_shards: &[(MAX, &[10, 20])],
3963 },
3964 StoragePruneCase {
3965 name: "no_shards",
3966 initial_shards: &[],
3967 prune_to: 100,
3968 expected_outcome: PruneShardOutcome::Unchanged,
3969 expected_shards: &[],
3970 },
3971 StoragePruneCase {
3972 name: "mid_shard_preserves_key",
3973 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3974 prune_to: 25,
3975 expected_outcome: PruneShardOutcome::Updated,
3976 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3977 },
3978 StoragePruneCase {
3980 name: "equiv_sentinel_promotion",
3981 initial_shards: &[(100, &[50, 75, 100])],
3982 prune_to: 60,
3983 expected_outcome: PruneShardOutcome::Updated,
3984 expected_shards: &[(MAX, &[75, 100])],
3985 },
3986 StoragePruneCase {
3987 name: "equiv_delete_early_shards_keep_sentinel",
3988 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3989 prune_to: 55,
3990 expected_outcome: PruneShardOutcome::Deleted,
3991 expected_shards: &[(MAX, &[60, 70])],
3992 },
3993 StoragePruneCase {
3994 name: "equiv_sentinel_becomes_empty_with_prev",
3995 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3996 prune_to: 40,
3997 expected_outcome: PruneShardOutcome::Deleted,
3998 expected_shards: &[(MAX, &[50])],
3999 },
4000 StoragePruneCase {
4001 name: "equiv_all_shards_become_empty",
4002 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
4003 prune_to: 51,
4004 expected_outcome: PruneShardOutcome::Deleted,
4005 expected_shards: &[],
4006 },
4007 StoragePruneCase {
4008 name: "equiv_filter_within_shard",
4009 initial_shards: &[(MAX, &[10, 20, 30, 40])],
4010 prune_to: 25,
4011 expected_outcome: PruneShardOutcome::Updated,
4012 expected_shards: &[(MAX, &[30, 40])],
4013 },
4014 StoragePruneCase {
4015 name: "equiv_multi_shard_partial_delete",
4016 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
4017 prune_to: 35,
4018 expected_outcome: PruneShardOutcome::Deleted,
4019 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
4020 },
4021 ];
4022
4023 let address = Address::from([0x42; 20]);
4024 let storage_key = B256::from([0x01; 32]);
4025
4026 for case in CASES {
4027 let temp_dir = TempDir::new().unwrap();
4028 let provider =
4029 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4030
4031 let mut batch = provider.batch();
4033 for (highest, blocks) in case.initial_shards {
4034 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4035 let key = if *highest == MAX {
4036 StorageShardedKey::last(address, storage_key)
4037 } else {
4038 StorageShardedKey::new(address, storage_key, *highest)
4039 };
4040 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4041 }
4042 batch.commit().unwrap();
4043
4044 let mut batch = provider.batch();
4046 let outcome =
4047 batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
4048 batch.commit().unwrap();
4049
4050 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
4052
4053 let shards = provider.storage_history_shards(address, storage_key).unwrap();
4055 assert_eq!(
4056 shards.len(),
4057 case.expected_shards.len(),
4058 "case '{}': wrong shard count",
4059 case.name
4060 );
4061 for (i, ((key, blocks), (exp_key, exp_blocks))) in
4062 shards.iter().zip(case.expected_shards.iter()).enumerate()
4063 {
4064 assert_eq!(
4065 key.sharded_key.highest_block_number, *exp_key,
4066 "case '{}': shard {} wrong key",
4067 case.name, i
4068 );
4069 assert_eq!(
4070 blocks.iter().collect::<Vec<_>>(),
4071 *exp_blocks,
4072 "case '{}': shard {} wrong blocks",
4073 case.name,
4074 i
4075 );
4076 }
4077 }
4078 }
4079
4080 #[test]
4081 fn test_prune_storage_history_does_not_affect_other_slots() {
4082 let temp_dir = TempDir::new().unwrap();
4083 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4084
4085 let address = Address::from([0x42; 20]);
4086 let slot1 = B256::from([0x01; 32]);
4087 let slot2 = B256::from([0x02; 32]);
4088
4089 let mut batch = provider.batch();
4091 batch
4092 .put::<tables::StoragesHistory>(
4093 StorageShardedKey::last(address, slot1),
4094 &BlockNumberList::new_pre_sorted([10u64, 20]),
4095 )
4096 .unwrap();
4097 batch
4098 .put::<tables::StoragesHistory>(
4099 StorageShardedKey::last(address, slot2),
4100 &BlockNumberList::new_pre_sorted([30u64, 40]),
4101 )
4102 .unwrap();
4103 batch.commit().unwrap();
4104
4105 let mut batch = provider.batch();
4107 let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
4108 batch.commit().unwrap();
4109
4110 assert_eq!(outcome, PruneShardOutcome::Deleted);
4111
4112 let shards1 = provider.storage_history_shards(address, slot1).unwrap();
4114 assert!(shards1.is_empty());
4115
4116 let shards2 = provider.storage_history_shards(address, slot2).unwrap();
4118 assert_eq!(shards2.len(), 1);
4119 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
4120 }
4121
4122 #[test]
4123 fn test_prune_invariants() {
4124 let address = Address::from([0x42; 20]);
4126 let storage_key = B256::from([0x01; 32]);
4127
4128 #[expect(clippy::type_complexity)]
4130 let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
4131 (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
4133 (&[(100, &[50, 100])], 60),
4135 ];
4136
4137 for (initial_shards, prune_to) in invariant_cases {
4138 {
4140 let temp_dir = TempDir::new().unwrap();
4141 let provider =
4142 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4143
4144 let mut batch = provider.batch();
4145 for (highest, blocks) in *initial_shards {
4146 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4147 batch
4148 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
4149 .unwrap();
4150 }
4151 batch.commit().unwrap();
4152
4153 let mut batch = provider.batch();
4154 batch.prune_account_history_to(address, *prune_to).unwrap();
4155 batch.commit().unwrap();
4156
4157 let shards = provider.account_history_shards(address).unwrap();
4158
4159 for (key, blocks) in &shards {
4161 assert!(
4162 !blocks.is_empty(),
4163 "Account: empty shard at key {}",
4164 key.highest_block_number
4165 );
4166 }
4167
4168 if !shards.is_empty() {
4170 let last = shards.last().unwrap();
4171 assert_eq!(
4172 last.0.highest_block_number,
4173 u64::MAX,
4174 "Account: last shard must be sentinel"
4175 );
4176 }
4177 }
4178
4179 {
4181 let temp_dir = TempDir::new().unwrap();
4182 let provider =
4183 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4184
4185 let mut batch = provider.batch();
4186 for (highest, blocks) in *initial_shards {
4187 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4188 let key = if *highest == u64::MAX {
4189 StorageShardedKey::last(address, storage_key)
4190 } else {
4191 StorageShardedKey::new(address, storage_key, *highest)
4192 };
4193 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4194 }
4195 batch.commit().unwrap();
4196
4197 let mut batch = provider.batch();
4198 batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
4199 batch.commit().unwrap();
4200
4201 let shards = provider.storage_history_shards(address, storage_key).unwrap();
4202
4203 for (key, blocks) in &shards {
4205 assert!(
4206 !blocks.is_empty(),
4207 "Storage: empty shard at key {}",
4208 key.sharded_key.highest_block_number
4209 );
4210 }
4211
4212 if !shards.is_empty() {
4214 let last = shards.last().unwrap();
4215 assert_eq!(
4216 last.0.sharded_key.highest_block_number,
4217 u64::MAX,
4218 "Storage: last shard must be sentinel"
4219 );
4220 }
4221 }
4222 }
4223 }
4224
4225 #[test]
4226 fn test_prune_account_history_batch_multiple_sorted_targets() {
4227 let temp_dir = TempDir::new().unwrap();
4228 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4229
4230 let addr1 = Address::from([0x01; 20]);
4231 let addr2 = Address::from([0x02; 20]);
4232 let addr3 = Address::from([0x03; 20]);
4233
4234 let mut batch = provider.batch();
4236 batch
4237 .put::<tables::AccountsHistory>(
4238 ShardedKey::new(addr1, u64::MAX),
4239 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4240 )
4241 .unwrap();
4242 batch
4243 .put::<tables::AccountsHistory>(
4244 ShardedKey::new(addr2, u64::MAX),
4245 &BlockNumberList::new_pre_sorted([5, 10, 15]),
4246 )
4247 .unwrap();
4248 batch
4249 .put::<tables::AccountsHistory>(
4250 ShardedKey::new(addr3, u64::MAX),
4251 &BlockNumberList::new_pre_sorted([100, 200]),
4252 )
4253 .unwrap();
4254 batch.commit().unwrap();
4255
4256 let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4258 targets.sort_by_key(|(addr, _)| *addr);
4259
4260 let mut batch = provider.batch();
4261 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4262 batch.commit().unwrap();
4263
4264 assert_eq!(outcomes.updated, 2);
4268 assert_eq!(outcomes.unchanged, 1);
4269
4270 let shards1 = provider.account_history_shards(addr1).unwrap();
4271 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4272
4273 let shards2 = provider.account_history_shards(addr2).unwrap();
4274 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4275
4276 let shards3 = provider.account_history_shards(addr3).unwrap();
4277 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4278 }
4279
4280 #[test]
4281 fn test_prune_account_history_batch_target_with_no_shards() {
4282 let temp_dir = TempDir::new().unwrap();
4283 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4284
4285 let addr1 = Address::from([0x01; 20]);
4286 let addr2 = Address::from([0x02; 20]); let addr3 = Address::from([0x03; 20]);
4288
4289 let mut batch = provider.batch();
4291 batch
4292 .put::<tables::AccountsHistory>(
4293 ShardedKey::new(addr1, u64::MAX),
4294 &BlockNumberList::new_pre_sorted([10, 20]),
4295 )
4296 .unwrap();
4297 batch
4298 .put::<tables::AccountsHistory>(
4299 ShardedKey::new(addr3, u64::MAX),
4300 &BlockNumberList::new_pre_sorted([30, 40]),
4301 )
4302 .unwrap();
4303 batch.commit().unwrap();
4304
4305 let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4307 targets.sort_by_key(|(addr, _)| *addr);
4308
4309 let mut batch = provider.batch();
4310 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4311 batch.commit().unwrap();
4312
4313 assert_eq!(outcomes.updated, 2);
4317 assert_eq!(outcomes.unchanged, 1);
4318
4319 let shards1 = provider.account_history_shards(addr1).unwrap();
4320 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4321
4322 let shards3 = provider.account_history_shards(addr3).unwrap();
4323 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4324 }
4325
4326 #[test]
4327 fn test_prune_storage_history_batch_multiple_sorted_targets() {
4328 let temp_dir = TempDir::new().unwrap();
4329 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4330
4331 let addr = Address::from([0x42; 20]);
4332 let slot1 = B256::from([0x01; 32]);
4333 let slot2 = B256::from([0x02; 32]);
4334
4335 let mut batch = provider.batch();
4337 batch
4338 .put::<tables::StoragesHistory>(
4339 StorageShardedKey::new(addr, slot1, u64::MAX),
4340 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4341 )
4342 .unwrap();
4343 batch
4344 .put::<tables::StoragesHistory>(
4345 StorageShardedKey::new(addr, slot2, u64::MAX),
4346 &BlockNumberList::new_pre_sorted([5, 15, 25]),
4347 )
4348 .unwrap();
4349 batch.commit().unwrap();
4350
4351 let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4353 targets.sort_by_key(|((a, s), _)| (*a, *s));
4354
4355 let mut batch = provider.batch();
4356 let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4357 batch.commit().unwrap();
4358
4359 assert_eq!(outcomes.updated, 2);
4360
4361 let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4362 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4363
4364 let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4365 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4366 }
4367}