1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5 map::{AddressMap, HashMap},
6 Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13 database_metrics::DatabaseMetrics,
14 models::{
15 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16 StorageSettings,
17 },
18 table::{Compress, Decode, Decompress, Encode, Table},
19 tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25 provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30 OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
31 DB,
32};
33use std::{
34 collections::BTreeMap,
35 fmt,
36 path::{Path, PathBuf},
37 sync::Arc,
38};
39use tracing::instrument;
40
41pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
43
44type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
46
47#[derive(Debug, Clone)]
49pub struct RocksDBTableStats {
50 pub sst_size_bytes: u64,
52 pub memtable_size_bytes: u64,
54 pub name: String,
56 pub estimated_num_keys: u64,
58 pub estimated_size_bytes: u64,
60 pub pending_compaction_bytes: u64,
62}
63
64#[derive(Debug, Clone)]
68pub struct RocksDBStats {
69 pub tables: Vec<RocksDBTableStats>,
71 pub wal_size_bytes: u64,
75}
76
77#[derive(Clone)]
79pub(crate) struct RocksDBWriteCtx {
80 pub first_block_number: BlockNumber,
82 pub prune_tx_lookup: Option<PruneMode>,
84 pub storage_settings: StorageSettings,
86 pub pending_batches: PendingRocksDBBatches,
88}
89
90impl fmt::Debug for RocksDBWriteCtx {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("RocksDBWriteCtx")
93 .field("first_block_number", &self.first_block_number)
94 .field("prune_tx_lookup", &self.prune_tx_lookup)
95 .field("storage_settings", &self.storage_settings)
96 .field("pending_batches", &"<pending batches>")
97 .finish()
98 }
99}
100
101const DEFAULT_CACHE_SIZE: usize = 128 << 20;
103
104const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
106
107const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
109
110const DEFAULT_MAX_OPEN_FILES: i32 = 512;
118
119const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
121
122const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
128
129const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
133
134const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
140
141pub struct RocksDBBuilder {
143 path: PathBuf,
144 column_families: Vec<String>,
145 enable_metrics: bool,
146 enable_statistics: bool,
147 log_level: rocksdb::LogLevel,
148 block_cache: Cache,
149 read_only: bool,
150}
151
152impl fmt::Debug for RocksDBBuilder {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 f.debug_struct("RocksDBBuilder")
155 .field("path", &self.path)
156 .field("column_families", &self.column_families)
157 .field("enable_metrics", &self.enable_metrics)
158 .finish()
159 }
160}
161
162impl RocksDBBuilder {
163 pub fn new(path: impl AsRef<Path>) -> Self {
165 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
166 Self {
167 path: path.as_ref().to_path_buf(),
168 column_families: Vec::new(),
169 enable_metrics: false,
170 enable_statistics: false,
171 log_level: rocksdb::LogLevel::Info,
172 block_cache: cache,
173 read_only: false,
174 }
175 }
176
177 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
179 let mut table_options = BlockBasedOptions::default();
180 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
181 table_options.set_cache_index_and_filter_blocks(true);
182 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
183 table_options.set_block_cache(cache);
185 table_options
186 }
187
188 fn default_options(
190 log_level: rocksdb::LogLevel,
191 cache: &Cache,
192 enable_statistics: bool,
193 ) -> Options {
194 let table_options = Self::default_table_options(cache);
196
197 let mut options = Options::default();
198 options.set_block_based_table_factory(&table_options);
199 options.create_if_missing(true);
200 options.create_missing_column_families(true);
201 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
202 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
203
204 options.set_bottommost_compression_type(DBCompressionType::Zstd);
205 options.set_bottommost_zstd_max_train_bytes(0, true);
206 options.set_compression_type(DBCompressionType::Lz4);
207 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
208
209 options.set_log_level(log_level);
210
211 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
212
213 options.set_wal_ttl_seconds(0);
216 options.set_wal_size_limit_mb(0);
217
218 if enable_statistics {
220 options.enable_statistics();
221 }
222
223 options
224 }
225
226 fn default_column_family_options(cache: &Cache) -> Options {
228 let table_options = Self::default_table_options(cache);
230
231 let mut cf_options = Options::default();
232 cf_options.set_block_based_table_factory(&table_options);
233 cf_options.set_level_compaction_dynamic_level_bytes(true);
234 cf_options.set_compression_type(DBCompressionType::Lz4);
236 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
237 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
239 cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
240
241 cf_options
242 }
243
244 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
251 let mut table_options = BlockBasedOptions::default();
252 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
253 table_options.set_cache_index_and_filter_blocks(true);
254 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
255 table_options.set_block_cache(cache);
256 let mut cf_options = Options::default();
260 cf_options.set_block_based_table_factory(&table_options);
261 cf_options.set_level_compaction_dynamic_level_bytes(true);
262 cf_options.set_compression_type(DBCompressionType::None);
265 cf_options.set_bottommost_compression_type(DBCompressionType::None);
266
267 cf_options
268 }
269
270 pub fn with_table<T: Table>(mut self) -> Self {
272 self.column_families.push(T::NAME.to_string());
273 self
274 }
275
276 pub fn with_default_tables(self) -> Self {
283 self.with_table::<tables::TransactionHashNumbers>()
284 .with_table::<tables::AccountsHistory>()
285 .with_table::<tables::StoragesHistory>()
286 }
287
288 pub const fn with_metrics(mut self) -> Self {
290 self.enable_metrics = true;
291 self
292 }
293
294 pub const fn with_statistics(mut self) -> Self {
296 self.enable_statistics = true;
297 self
298 }
299
300 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
302 if let Some(level) = log_level {
303 self.log_level = convert_log_level(level);
304 }
305 self
306 }
307
308 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
310 self.block_cache = Cache::new_lru_cache(capacity_bytes);
311 self
312 }
313
314 pub const fn with_read_only(mut self, read_only: bool) -> Self {
318 self.read_only = read_only;
319 self
320 }
321
322 pub fn build(self) -> ProviderResult<RocksDBProvider> {
324 let options =
325 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
326
327 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
328 .column_families
329 .iter()
330 .map(|name| {
331 let cf_options = if name == tables::TransactionHashNumbers::NAME {
332 Self::tx_hash_numbers_column_family_options(&self.block_cache)
333 } else {
334 Self::default_column_family_options(&self.block_cache)
335 };
336 ColumnFamilyDescriptor::new(name.clone(), cf_options)
337 })
338 .collect();
339
340 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
341
342 if self.read_only {
343 let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
344 .map_err(|e| {
345 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
346 message: e.to_string().into(),
347 code: -1,
348 }))
349 })?;
350 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
351 } else {
352 let db =
357 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
358 .map_err(|e| {
359 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
360 message: e.to_string().into(),
361 code: -1,
362 }))
363 })?;
364 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
365 }
366 }
367}
368
369macro_rules! compress_to_buf_or_ref {
372 ($buf:expr, $value:expr) => {
373 if let Some(value) = $value.uncompressable_ref() {
374 Some(value)
375 } else {
376 $buf.clear();
377 $value.compress_to_buf(&mut $buf);
378 None
379 }
380 };
381}
382
383#[derive(Debug)]
385pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
386
387enum RocksDBProviderInner {
389 ReadWrite {
391 db: OptimisticTransactionDB,
393 metrics: Option<RocksDBMetrics>,
395 },
396 ReadOnly {
399 db: DB,
401 metrics: Option<RocksDBMetrics>,
403 },
404}
405
406impl RocksDBProviderInner {
407 const fn metrics(&self) -> Option<&RocksDBMetrics> {
409 match self {
410 Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
411 }
412 }
413
414 fn db_rw(&self) -> &OptimisticTransactionDB {
416 match self {
417 Self::ReadWrite { db, .. } => db,
418 Self::ReadOnly { .. } => {
419 panic!("Cannot perform write operation on read-only RocksDB provider")
420 }
421 }
422 }
423
424 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
426 let cf = match self {
427 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
428 Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
429 };
430 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
431 }
432
433 fn cf_handle_rw(&self, name: &str) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
438 self.db_rw()
439 .cf_handle(name)
440 .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", name)))
441 }
442
443 fn get_cf(
445 &self,
446 cf: &rocksdb::ColumnFamily,
447 key: impl AsRef<[u8]>,
448 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
449 match self {
450 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
451 Self::ReadOnly { db, .. } => db.get_cf(cf, key),
452 }
453 }
454
455 fn put_cf(
457 &self,
458 cf: &rocksdb::ColumnFamily,
459 key: impl AsRef<[u8]>,
460 value: impl AsRef<[u8]>,
461 ) -> Result<(), rocksdb::Error> {
462 self.db_rw().put_cf(cf, key, value)
463 }
464
465 fn delete_cf(
467 &self,
468 cf: &rocksdb::ColumnFamily,
469 key: impl AsRef<[u8]>,
470 ) -> Result<(), rocksdb::Error> {
471 self.db_rw().delete_cf(cf, key)
472 }
473
474 fn delete_range_cf<K: AsRef<[u8]>>(
476 &self,
477 cf: &rocksdb::ColumnFamily,
478 from: K,
479 to: K,
480 ) -> Result<(), rocksdb::Error> {
481 self.db_rw().delete_range_cf(cf, from, to)
482 }
483
484 fn iterator_cf(
486 &self,
487 cf: &rocksdb::ColumnFamily,
488 mode: IteratorMode<'_>,
489 ) -> RocksDBIterEnum<'_> {
490 match self {
491 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
492 Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
493 }
494 }
495
496 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
501 match self {
502 Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
503 Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
504 }
505 }
506
507 fn path(&self) -> &Path {
509 match self {
510 Self::ReadWrite { db, .. } => db.path(),
511 Self::ReadOnly { db, .. } => db.path(),
512 }
513 }
514
515 fn wal_size_bytes(&self) -> u64 {
519 let path = self.path();
520
521 match std::fs::read_dir(path) {
522 Ok(entries) => entries
523 .filter_map(|e| e.ok())
524 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
525 .filter_map(|e| e.metadata().ok())
526 .map(|m| m.len())
527 .sum(),
528 Err(_) => 0,
529 }
530 }
531
532 fn table_stats(&self) -> Vec<RocksDBTableStats> {
534 let mut stats = Vec::new();
535
536 macro_rules! collect_stats {
537 ($db:expr) => {
538 for cf_name in ROCKSDB_TABLES {
539 if let Some(cf) = $db.cf_handle(cf_name) {
540 let estimated_num_keys = $db
541 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
542 .ok()
543 .flatten()
544 .unwrap_or(0);
545
546 let sst_size = $db
548 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
549 .ok()
550 .flatten()
551 .unwrap_or(0);
552
553 let memtable_size = $db
554 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
555 .ok()
556 .flatten()
557 .unwrap_or(0);
558
559 let estimated_size_bytes = sst_size + memtable_size;
560
561 let pending_compaction_bytes = $db
562 .property_int_value_cf(
563 cf,
564 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
565 )
566 .ok()
567 .flatten()
568 .unwrap_or(0);
569
570 stats.push(RocksDBTableStats {
571 sst_size_bytes: sst_size,
572 memtable_size_bytes: memtable_size,
573 name: cf_name.to_string(),
574 estimated_num_keys,
575 estimated_size_bytes,
576 pending_compaction_bytes,
577 });
578 }
579 }
580 };
581 }
582
583 match self {
584 Self::ReadWrite { db, .. } => collect_stats!(db),
585 Self::ReadOnly { db, .. } => collect_stats!(db),
586 }
587
588 stats
589 }
590
591 fn db_stats(&self) -> RocksDBStats {
593 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
594 }
595}
596
597impl fmt::Debug for RocksDBProviderInner {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 match self {
600 Self::ReadWrite { metrics, .. } => f
601 .debug_struct("RocksDBProviderInner::ReadWrite")
602 .field("db", &"<OptimisticTransactionDB>")
603 .field("metrics", metrics)
604 .finish(),
605 Self::ReadOnly { metrics, .. } => f
606 .debug_struct("RocksDBProviderInner::ReadOnly")
607 .field("db", &"<DB (read-only)>")
608 .field("metrics", metrics)
609 .finish(),
610 }
611 }
612}
613
614impl Drop for RocksDBProviderInner {
615 fn drop(&mut self) {
616 match self {
617 Self::ReadWrite { db, .. } => {
618 if let Err(e) = db.flush_wal(true) {
621 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
622 }
623 for cf_name in ROCKSDB_TABLES {
624 if let Some(cf) = db.cf_handle(cf_name) &&
625 let Err(e) = db.flush_cf(&cf)
626 {
627 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
628 }
629 }
630 db.cancel_all_background_work(true);
631 }
632 Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
633 }
634 }
635}
636
637impl Clone for RocksDBProvider {
638 fn clone(&self) -> Self {
639 Self(self.0.clone())
640 }
641}
642
643impl DatabaseMetrics for RocksDBProvider {
644 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
645 let mut metrics = Vec::new();
646
647 for stat in self.table_stats() {
648 metrics.push((
649 "rocksdb.table_size",
650 stat.estimated_size_bytes as f64,
651 vec![Label::new("table", stat.name.clone())],
652 ));
653 metrics.push((
654 "rocksdb.table_entries",
655 stat.estimated_num_keys as f64,
656 vec![Label::new("table", stat.name.clone())],
657 ));
658 metrics.push((
659 "rocksdb.pending_compaction_bytes",
660 stat.pending_compaction_bytes as f64,
661 vec![Label::new("table", stat.name.clone())],
662 ));
663 metrics.push((
664 "rocksdb.sst_size",
665 stat.sst_size_bytes as f64,
666 vec![Label::new("table", stat.name.clone())],
667 ));
668 metrics.push((
669 "rocksdb.memtable_size",
670 stat.memtable_size_bytes as f64,
671 vec![Label::new("table", stat.name)],
672 ));
673 }
674
675 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
677
678 metrics
679 }
680}
681
682impl RocksDBProvider {
683 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
685 RocksDBBuilder::new(path).build()
686 }
687
688 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
690 RocksDBBuilder::new(path)
691 }
692
693 pub fn exists(path: impl AsRef<Path>) -> bool {
698 path.as_ref().join("CURRENT").exists()
699 }
700
701 pub fn is_read_only(&self) -> bool {
703 matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
704 }
705
706 pub fn tx(&self) -> RocksTx<'_> {
714 let write_options = WriteOptions::default();
715 let txn_options = OptimisticTransactionOptions::default();
716 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
717 RocksTx { inner, provider: self }
718 }
719
720 pub fn batch(&self) -> RocksDBBatch<'_> {
728 RocksDBBatch {
729 provider: self,
730 inner: WriteBatchWithTransaction::<true>::default(),
731 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
732 auto_commit_threshold: None,
733 }
734 }
735
736 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
742 RocksDBBatch {
743 provider: self,
744 inner: WriteBatchWithTransaction::<true>::default(),
745 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
746 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
747 }
748 }
749
750 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
752 self.0.cf_handle::<T>()
753 }
754
755 fn execute_with_operation_metric<R>(
757 &self,
758 operation: RocksDBOperation,
759 table: &'static str,
760 f: impl FnOnce(&Self) -> R,
761 ) -> R {
762 let start = self.0.metrics().map(|_| Instant::now());
763 let res = f(self);
764
765 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
766 metrics.record_operation(operation, table, start.elapsed());
767 }
768
769 res
770 }
771
772 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
774 self.get_encoded::<T>(&key.encode())
775 }
776
777 pub fn get_encoded<T: Table>(
779 &self,
780 key: &<T::Key as Encode>::Encoded,
781 ) -> ProviderResult<Option<T::Value>> {
782 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
783 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
784 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
785 message: e.to_string().into(),
786 code: -1,
787 }))
788 })?;
789
790 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
791 })
792 }
793
794 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
799 let encoded_key = key.encode();
800 self.put_encoded::<T>(&encoded_key, value)
801 }
802
803 pub fn put_encoded<T: Table>(
808 &self,
809 key: &<T::Key as Encode>::Encoded,
810 value: &T::Value,
811 ) -> ProviderResult<()> {
812 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
813 let mut buf = Vec::new();
817 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
818
819 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
820 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
821 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
822 operation: DatabaseWriteOperation::PutUpsert,
823 table_name: T::NAME,
824 key: key.as_ref().to_vec(),
825 })))
826 })
827 })
828 }
829
830 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
835 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
836 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
837 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
838 message: e.to_string().into(),
839 code: -1,
840 }))
841 })
842 })
843 }
844
845 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
851 let cf = self.get_cf_handle::<T>()?;
852
853 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
854 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
855 message: e.to_string().into(),
856 code: -1,
857 }))
858 })?;
859
860 Ok(())
861 }
862
863 fn get_boundary<T: Table>(
865 &self,
866 mode: IteratorMode<'_>,
867 ) -> ProviderResult<Option<(T::Key, T::Value)>> {
868 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
869 let cf = this.get_cf_handle::<T>()?;
870 let mut iter = this.0.iterator_cf(cf, mode);
871
872 match iter.next() {
873 Some(Ok((key_bytes, value_bytes))) => {
874 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
875 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
876 let value = T::Value::decompress(&value_bytes)
877 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
878 Ok(Some((key, value)))
879 }
880 Some(Err(e)) => {
881 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
882 message: e.to_string().into(),
883 code: -1,
884 })))
885 }
886 None => Ok(None),
887 }
888 })
889 }
890
891 #[inline]
893 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
894 self.get_boundary::<T>(IteratorMode::Start)
895 }
896
897 #[inline]
899 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
900 self.get_boundary::<T>(IteratorMode::End)
901 }
902
903 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
907 let cf = self.get_cf_handle::<T>()?;
908 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
909 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
910 }
911
912 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
916 self.0.table_stats()
917 }
918
919 pub fn wal_size_bytes(&self) -> u64 {
925 self.0.wal_size_bytes()
926 }
927
928 pub fn db_stats(&self) -> RocksDBStats {
932 self.0.db_stats()
933 }
934
935 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
946 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
947 let db = self.0.db_rw();
948
949 for cf_name in tables {
950 if let Some(cf) = db.cf_handle(cf_name) {
951 db.flush_cf(&cf).map_err(|e| {
952 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
953 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
954 operation: DatabaseWriteOperation::Flush,
955 table_name: cf_name,
956 key: Vec::new(),
957 })))
958 })?;
959 }
960 }
961
962 db.flush_wal(true).map_err(|e| {
963 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
964 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
965 operation: DatabaseWriteOperation::Flush,
966 table_name: "WAL",
967 key: Vec::new(),
968 })))
969 })?;
970
971 Ok(())
972 }
973
974 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
986 pub fn flush_and_compact(&self) -> ProviderResult<()> {
987 self.flush(ROCKSDB_TABLES)?;
988
989 let db = self.0.db_rw();
990
991 for cf_name in ROCKSDB_TABLES {
992 if let Some(cf) = db.cf_handle(cf_name) {
993 db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
994 }
995 }
996
997 Ok(())
998 }
999
1000 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1004 let cf = self.get_cf_handle::<T>()?;
1005 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1006 Ok(RocksDBRawIter { inner: iter })
1007 }
1008
1009 pub fn account_history_shards(
1014 &self,
1015 address: Address,
1016 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1017 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1019
1020 let start_key = ShardedKey::new(address, 0u64);
1023 let start_bytes = start_key.encode();
1024
1025 let iter = self
1027 .0
1028 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1029
1030 let mut result = Vec::new();
1031 for item in iter {
1032 match item {
1033 Ok((key_bytes, value_bytes)) => {
1034 let key = ShardedKey::<Address>::decode(&key_bytes)
1036 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1037
1038 if key.key != address {
1040 break;
1041 }
1042
1043 let value = BlockNumberList::decompress(&value_bytes)
1045 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1046
1047 result.push((key, value));
1048 }
1049 Err(e) => {
1050 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1051 message: e.to_string().into(),
1052 code: -1,
1053 })));
1054 }
1055 }
1056 }
1057
1058 Ok(result)
1059 }
1060
1061 pub fn storage_history_shards(
1066 &self,
1067 address: Address,
1068 storage_key: B256,
1069 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1070 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1071
1072 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1073 let start_bytes = start_key.encode();
1074
1075 let iter = self
1076 .0
1077 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1078
1079 let mut result = Vec::new();
1080 for item in iter {
1081 match item {
1082 Ok((key_bytes, value_bytes)) => {
1083 let key = StorageShardedKey::decode(&key_bytes)
1084 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1085
1086 if key.address != address || key.sharded_key.key != storage_key {
1087 break;
1088 }
1089
1090 let value = BlockNumberList::decompress(&value_bytes)
1091 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1092
1093 result.push((key, value));
1094 }
1095 Err(e) => {
1096 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1097 message: e.to_string().into(),
1098 code: -1,
1099 })));
1100 }
1101 }
1102 }
1103
1104 Ok(result)
1105 }
1106
1107 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1115 pub fn unwind_account_history_indices(
1116 &self,
1117 last_indices: &[(Address, BlockNumber)],
1118 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1119 let mut address_min_block: AddressMap<BlockNumber> =
1120 AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1121 for &(address, block_number) in last_indices {
1122 address_min_block
1123 .entry(address)
1124 .and_modify(|min| *min = (*min).min(block_number))
1125 .or_insert(block_number);
1126 }
1127
1128 let mut batch = self.batch();
1129 for (address, min_block) in address_min_block {
1130 match min_block.checked_sub(1) {
1131 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1132 None => batch.clear_account_history(address)?,
1133 }
1134 }
1135
1136 Ok(batch.into_inner())
1137 }
1138
1139 pub fn unwind_storage_history_indices(
1147 &self,
1148 storage_changesets: &[(Address, B256, BlockNumber)],
1149 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1150 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1151 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1152 for &(address, storage_key, block_number) in storage_changesets {
1153 key_min_block
1154 .entry((address, storage_key))
1155 .and_modify(|min| *min = (*min).min(block_number))
1156 .or_insert(block_number);
1157 }
1158
1159 let mut batch = self.batch();
1160 for ((address, storage_key), min_block) in key_min_block {
1161 match min_block.checked_sub(1) {
1162 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1163 None => batch.clear_storage_history(address, storage_key)?,
1164 }
1165 }
1166
1167 Ok(batch.into_inner())
1168 }
1169
1170 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1172 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1173 where
1174 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1175 {
1176 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1177 let mut batch_handle = this.batch();
1178 f(&mut batch_handle)?;
1179 batch_handle.commit()
1180 })
1181 }
1182
1183 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1191 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1192 self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1193 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1194 message: e.to_string().into(),
1195 code: -1,
1196 }))
1197 })
1198 }
1199
1200 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1206 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1207 &self,
1208 blocks: &[ExecutedBlock<N>],
1209 tx_nums: &[TxNumber],
1210 ctx: RocksDBWriteCtx,
1211 runtime: &reth_tasks::Runtime,
1212 ) -> ProviderResult<()> {
1213 if !ctx.storage_settings.storage_v2 {
1214 return Ok(());
1215 }
1216
1217 let mut r_tx_hash = None;
1218 let mut r_account_history = None;
1219 let mut r_storage_history = None;
1220
1221 let write_tx_hash =
1222 ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1223 let write_account_history = ctx.storage_settings.storage_v2;
1224 let write_storage_history = ctx.storage_settings.storage_v2;
1225
1226 let span = tracing::Span::current();
1229 runtime.storage_pool().in_place_scope(|s| {
1230 if write_tx_hash {
1231 s.spawn(|_| {
1232 let _guard = span.enter();
1233 r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1234 });
1235 }
1236
1237 if write_account_history {
1238 s.spawn(|_| {
1239 let _guard = span.enter();
1240 r_account_history = Some(self.write_account_history(blocks, &ctx));
1241 });
1242 }
1243
1244 if write_storage_history {
1245 s.spawn(|_| {
1246 let _guard = span.enter();
1247 r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1248 });
1249 }
1250 });
1251
1252 if write_tx_hash {
1253 r_tx_hash.ok_or_else(|| {
1254 ProviderError::Database(DatabaseError::Other(
1255 "rocksdb tx-hash write thread panicked".into(),
1256 ))
1257 })??;
1258 }
1259 if write_account_history {
1260 r_account_history.ok_or_else(|| {
1261 ProviderError::Database(DatabaseError::Other(
1262 "rocksdb account-history write thread panicked".into(),
1263 ))
1264 })??;
1265 }
1266 if write_storage_history {
1267 r_storage_history.ok_or_else(|| {
1268 ProviderError::Database(DatabaseError::Other(
1269 "rocksdb storage-history write thread panicked".into(),
1270 ))
1271 })??;
1272 }
1273
1274 Ok(())
1275 }
1276
1277 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1279 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1280 &self,
1281 blocks: &[ExecutedBlock<N>],
1282 tx_nums: &[TxNumber],
1283 ctx: &RocksDBWriteCtx,
1284 ) -> ProviderResult<()> {
1285 let mut batch = self.batch();
1286 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1287 let body = block.recovered_block().body();
1288 for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1289 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1290 }
1291 }
1292 ctx.pending_batches.lock().push(batch.into_inner());
1293 Ok(())
1294 }
1295
1296 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1300 fn write_account_history<N: reth_node_types::NodePrimitives>(
1301 &self,
1302 blocks: &[ExecutedBlock<N>],
1303 ctx: &RocksDBWriteCtx,
1304 ) -> ProviderResult<()> {
1305 let mut batch = self.batch();
1306 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1307
1308 for (block_idx, block) in blocks.iter().enumerate() {
1309 let block_number = ctx.first_block_number + block_idx as u64;
1310 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1311
1312 for account_block_reverts in reverts.accounts {
1315 for (address, _) in account_block_reverts {
1316 account_history.entry(address).or_default().push(block_number);
1317 }
1318 }
1319 }
1320
1321 for (address, indices) in account_history {
1323 batch.append_account_history_shard(address, indices)?;
1324 }
1325 ctx.pending_batches.lock().push(batch.into_inner());
1326 Ok(())
1327 }
1328
1329 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1333 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1334 &self,
1335 blocks: &[ExecutedBlock<N>],
1336 ctx: &RocksDBWriteCtx,
1337 ) -> ProviderResult<()> {
1338 let mut batch = self.batch();
1339 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1340
1341 for (block_idx, block) in blocks.iter().enumerate() {
1342 let block_number = ctx.first_block_number + block_idx as u64;
1343 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1344
1345 for storage_block_reverts in reverts.storage {
1348 for revert in storage_block_reverts {
1349 for (slot, _) in revert.storage_revert {
1350 let plain_key = B256::new(slot.to_be_bytes());
1351 storage_history
1352 .entry((revert.address, plain_key))
1353 .or_default()
1354 .push(block_number);
1355 }
1356 }
1357 }
1358 }
1359
1360 for ((address, slot), indices) in storage_history {
1362 batch.append_storage_history_shard(address, slot, indices)?;
1363 }
1364 ctx.pending_batches.lock().push(batch.into_inner());
1365 Ok(())
1366 }
1367}
1368
1369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1371pub enum PruneShardOutcome {
1372 Deleted,
1374 Updated,
1376 Unchanged,
1378}
1379
1380#[derive(Debug, Default, Clone, Copy)]
1382pub struct PrunedIndices {
1383 pub deleted: usize,
1385 pub updated: usize,
1387 pub unchanged: usize,
1389}
1390
1391#[must_use = "batch must be committed"]
1401pub struct RocksDBBatch<'a> {
1402 provider: &'a RocksDBProvider,
1403 inner: WriteBatchWithTransaction<true>,
1404 buf: Vec<u8>,
1405 auto_commit_threshold: Option<usize>,
1407}
1408
1409impl fmt::Debug for RocksDBBatch<'_> {
1410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1411 f.debug_struct("RocksDBBatch")
1412 .field("provider", &self.provider)
1413 .field("batch", &"<WriteBatchWithTransaction>")
1414 .field("length", &self.inner.len())
1416 .field("size_in_bytes", &self.inner.size_in_bytes())
1419 .finish()
1420 }
1421}
1422
1423impl<'a> RocksDBBatch<'a> {
1424 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1428 let encoded_key = key.encode();
1429 self.put_encoded::<T>(&encoded_key, value)
1430 }
1431
1432 pub fn put_encoded<T: Table>(
1436 &mut self,
1437 key: &<T::Key as Encode>::Encoded,
1438 value: &T::Value,
1439 ) -> ProviderResult<()> {
1440 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1441 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1442 self.maybe_auto_commit()?;
1443 Ok(())
1444 }
1445
1446 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1450 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1451 self.maybe_auto_commit()?;
1452 Ok(())
1453 }
1454
1455 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1460 if let Some(threshold) = self.auto_commit_threshold &&
1461 self.inner.size_in_bytes() >= threshold
1462 {
1463 tracing::debug!(
1464 target: "providers::rocksdb",
1465 batch_size = self.inner.size_in_bytes(),
1466 threshold,
1467 "Auto-committing RocksDB batch"
1468 );
1469 let old_batch = std::mem::take(&mut self.inner);
1470 self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1471 |e| {
1472 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1473 message: e.to_string().into(),
1474 code: -1,
1475 }))
1476 },
1477 )?;
1478 }
1479 Ok(())
1480 }
1481
1482 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1489 pub fn commit(self) -> ProviderResult<()> {
1490 self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1491 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1492 message: e.to_string().into(),
1493 code: -1,
1494 }))
1495 })
1496 }
1497
1498 pub fn len(&self) -> usize {
1500 self.inner.len()
1501 }
1502
1503 pub fn is_empty(&self) -> bool {
1505 self.inner.is_empty()
1506 }
1507
1508 pub fn size_in_bytes(&self) -> usize {
1510 self.inner.size_in_bytes()
1511 }
1512
1513 pub const fn provider(&self) -> &RocksDBProvider {
1515 self.provider
1516 }
1517
1518 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1522 self.inner
1523 }
1524
1525 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1530 self.provider.get::<T>(key)
1531 }
1532
1533 pub fn append_account_history_shard(
1545 &mut self,
1546 address: Address,
1547 indices: impl IntoIterator<Item = u64>,
1548 ) -> ProviderResult<()> {
1549 let indices: Vec<u64> = indices.into_iter().collect();
1550
1551 if indices.is_empty() {
1552 return Ok(());
1553 }
1554
1555 debug_assert!(
1556 indices.windows(2).all(|w| w[0] < w[1]),
1557 "indices must be strictly increasing: {:?}",
1558 indices
1559 );
1560
1561 let last_key = ShardedKey::new(address, u64::MAX);
1562 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1563 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1564
1565 last_shard.append(indices).map_err(ProviderError::other)?;
1566
1567 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1569 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1570 return Ok(());
1571 }
1572
1573 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1575 let mut chunks_peekable = chunks.into_iter().peekable();
1576
1577 while let Some(chunk) = chunks_peekable.next() {
1578 let shard = BlockNumberList::new_pre_sorted(chunk);
1579 let highest_block_number = if chunks_peekable.peek().is_some() {
1580 shard.iter().next_back().expect("`chunks` does not return empty list")
1581 } else {
1582 u64::MAX
1583 };
1584
1585 self.put::<tables::AccountsHistory>(
1586 ShardedKey::new(address, highest_block_number),
1587 &shard,
1588 )?;
1589 }
1590
1591 Ok(())
1592 }
1593
1594 pub fn append_storage_history_shard(
1606 &mut self,
1607 address: Address,
1608 storage_key: B256,
1609 indices: impl IntoIterator<Item = u64>,
1610 ) -> ProviderResult<()> {
1611 let indices: Vec<u64> = indices.into_iter().collect();
1612
1613 if indices.is_empty() {
1614 return Ok(());
1615 }
1616
1617 debug_assert!(
1618 indices.windows(2).all(|w| w[0] < w[1]),
1619 "indices must be strictly increasing: {:?}",
1620 indices
1621 );
1622
1623 let last_key = StorageShardedKey::last(address, storage_key);
1624 let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1625 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1626
1627 last_shard.append(indices).map_err(ProviderError::other)?;
1628
1629 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1631 self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1632 return Ok(());
1633 }
1634
1635 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1637 let mut chunks_peekable = chunks.into_iter().peekable();
1638
1639 while let Some(chunk) = chunks_peekable.next() {
1640 let shard = BlockNumberList::new_pre_sorted(chunk);
1641 let highest_block_number = if chunks_peekable.peek().is_some() {
1642 shard.iter().next_back().expect("`chunks` does not return empty list")
1643 } else {
1644 u64::MAX
1645 };
1646
1647 self.put::<tables::StoragesHistory>(
1648 StorageShardedKey::new(address, storage_key, highest_block_number),
1649 &shard,
1650 )?;
1651 }
1652
1653 Ok(())
1654 }
1655
1656 pub fn unwind_account_history_to(
1663 &mut self,
1664 address: Address,
1665 keep_to: BlockNumber,
1666 ) -> ProviderResult<()> {
1667 let shards = self.provider.account_history_shards(address)?;
1668 if shards.is_empty() {
1669 return Ok(());
1670 }
1671
1672 let boundary_idx = shards.iter().position(|(key, _)| {
1675 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1676 });
1677
1678 let Some(boundary_idx) = boundary_idx else {
1680 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1681 if last_key.highest_block_number != u64::MAX {
1682 self.delete::<tables::AccountsHistory>(last_key.clone())?;
1683 self.put::<tables::AccountsHistory>(
1684 ShardedKey::new(address, u64::MAX),
1685 last_value,
1686 )?;
1687 }
1688 return Ok(());
1689 };
1690
1691 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1693 self.delete::<tables::AccountsHistory>(key.clone())?;
1694 }
1695
1696 let (boundary_key, boundary_list) = &shards[boundary_idx];
1698
1699 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1701
1702 let new_last =
1704 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1705
1706 if new_last.is_empty() {
1707 if boundary_idx == 0 {
1710 return Ok(());
1712 }
1713
1714 let (prev_key, prev_value) = &shards[boundary_idx - 1];
1715 if prev_key.highest_block_number != u64::MAX {
1716 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1717 self.put::<tables::AccountsHistory>(
1718 ShardedKey::new(address, u64::MAX),
1719 prev_value,
1720 )?;
1721 }
1722 return Ok(());
1723 }
1724
1725 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1726
1727 Ok(())
1728 }
1729
1730 #[allow(clippy::too_many_arguments)]
1736 fn prune_history_shards_inner<K>(
1737 &mut self,
1738 shards: Vec<(K, BlockNumberList)>,
1739 to_block: BlockNumber,
1740 get_highest: impl Fn(&K) -> u64,
1741 is_sentinel: impl Fn(&K) -> bool,
1742 delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
1743 put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
1744 create_sentinel: impl Fn() -> K,
1745 ) -> ProviderResult<PruneShardOutcome>
1746 where
1747 K: Clone,
1748 {
1749 if shards.is_empty() {
1750 return Ok(PruneShardOutcome::Unchanged);
1751 }
1752
1753 let mut deleted = false;
1754 let mut updated = false;
1755 let mut last_remaining: Option<(K, BlockNumberList)> = None;
1756
1757 for (key, block_list) in shards {
1758 if !is_sentinel(&key) && get_highest(&key) <= to_block {
1759 delete_shard(self, key)?;
1760 deleted = true;
1761 } else {
1762 let original_len = block_list.len();
1763 let filtered =
1764 BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
1765
1766 if filtered.is_empty() {
1767 delete_shard(self, key)?;
1768 deleted = true;
1769 } else if filtered.len() < original_len {
1770 put_shard(self, key.clone(), &filtered)?;
1771 last_remaining = Some((key, filtered));
1772 updated = true;
1773 } else {
1774 last_remaining = Some((key, block_list));
1775 }
1776 }
1777 }
1778
1779 if let Some((last_key, last_value)) = last_remaining &&
1780 !is_sentinel(&last_key)
1781 {
1782 delete_shard(self, last_key)?;
1783 put_shard(self, create_sentinel(), &last_value)?;
1784 updated = true;
1785 }
1786
1787 if deleted {
1788 Ok(PruneShardOutcome::Deleted)
1789 } else if updated {
1790 Ok(PruneShardOutcome::Updated)
1791 } else {
1792 Ok(PruneShardOutcome::Unchanged)
1793 }
1794 }
1795
1796 pub fn prune_account_history_to(
1801 &mut self,
1802 address: Address,
1803 to_block: BlockNumber,
1804 ) -> ProviderResult<PruneShardOutcome> {
1805 let shards = self.provider.account_history_shards(address)?;
1806 self.prune_history_shards_inner(
1807 shards,
1808 to_block,
1809 |key| key.highest_block_number,
1810 |key| key.highest_block_number == u64::MAX,
1811 |batch, key| batch.delete::<tables::AccountsHistory>(key),
1812 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1813 || ShardedKey::new(address, u64::MAX),
1814 )
1815 }
1816
1817 pub fn prune_account_history_batch(
1826 &mut self,
1827 targets: &[(Address, BlockNumber)],
1828 ) -> ProviderResult<PrunedIndices> {
1829 if targets.is_empty() {
1830 return Ok(PrunedIndices::default());
1831 }
1832
1833 debug_assert!(
1834 targets.windows(2).all(|w| w[0].0 <= w[1].0),
1835 "prune_account_history_batch: targets must be sorted by address"
1836 );
1837
1838 const PREFIX_LEN: usize = 20;
1841
1842 let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
1843 let mut iter = self.provider.0.raw_iterator_cf(cf);
1844 let mut outcomes = PrunedIndices::default();
1845
1846 for (address, to_block) in targets {
1847 let start_key = ShardedKey::new(*address, 0u64).encode();
1849 let target_prefix = &start_key[..PREFIX_LEN];
1850
1851 let needs_seek = if iter.valid() {
1857 if let Some(current_key) = iter.key() {
1858 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1862 } else {
1863 true
1864 }
1865 } else {
1866 true
1867 };
1868
1869 if needs_seek {
1870 iter.seek(start_key);
1871 iter.status().map_err(|e| {
1872 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1873 message: e.to_string().into(),
1874 code: -1,
1875 }))
1876 })?;
1877 }
1878
1879 let mut shards = Vec::new();
1881 while iter.valid() {
1882 let Some(key_bytes) = iter.key() else { break };
1883
1884 let current_prefix = key_bytes.get(..PREFIX_LEN);
1886 if current_prefix != Some(target_prefix) {
1887 break;
1888 }
1889
1890 let key = ShardedKey::<Address>::decode(key_bytes)
1892 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1893
1894 let Some(value_bytes) = iter.value() else { break };
1895 let value = BlockNumberList::decompress(value_bytes)
1896 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1897
1898 shards.push((key, value));
1899 iter.next();
1900 }
1901
1902 match self.prune_history_shards_inner(
1903 shards,
1904 *to_block,
1905 |key| key.highest_block_number,
1906 |key| key.highest_block_number == u64::MAX,
1907 |batch, key| batch.delete::<tables::AccountsHistory>(key),
1908 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1909 || ShardedKey::new(*address, u64::MAX),
1910 )? {
1911 PruneShardOutcome::Deleted => outcomes.deleted += 1,
1912 PruneShardOutcome::Updated => outcomes.updated += 1,
1913 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
1914 }
1915 }
1916
1917 Ok(outcomes)
1918 }
1919
1920 pub fn prune_storage_history_to(
1926 &mut self,
1927 address: Address,
1928 storage_key: B256,
1929 to_block: BlockNumber,
1930 ) -> ProviderResult<PruneShardOutcome> {
1931 let shards = self.provider.storage_history_shards(address, storage_key)?;
1932 self.prune_history_shards_inner(
1933 shards,
1934 to_block,
1935 |key| key.sharded_key.highest_block_number,
1936 |key| key.sharded_key.highest_block_number == u64::MAX,
1937 |batch, key| batch.delete::<tables::StoragesHistory>(key),
1938 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
1939 || StorageShardedKey::last(address, storage_key),
1940 )
1941 }
1942
1943 pub fn prune_storage_history_batch(
1953 &mut self,
1954 targets: &[((Address, B256), BlockNumber)],
1955 ) -> ProviderResult<PrunedIndices> {
1956 if targets.is_empty() {
1957 return Ok(PrunedIndices::default());
1958 }
1959
1960 debug_assert!(
1961 targets.windows(2).all(|w| w[0].0 <= w[1].0),
1962 "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
1963 );
1964
1965 const PREFIX_LEN: usize = 52;
1968
1969 let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
1970 let mut iter = self.provider.0.raw_iterator_cf(cf);
1971 let mut outcomes = PrunedIndices::default();
1972
1973 for ((address, storage_key), to_block) in targets {
1974 let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
1976 let target_prefix = &start_key[..PREFIX_LEN];
1977
1978 let needs_seek = if iter.valid() {
1984 if let Some(current_key) = iter.key() {
1985 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1989 } else {
1990 true
1991 }
1992 } else {
1993 true
1994 };
1995
1996 if needs_seek {
1997 iter.seek(start_key);
1998 iter.status().map_err(|e| {
1999 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2000 message: e.to_string().into(),
2001 code: -1,
2002 }))
2003 })?;
2004 }
2005
2006 let mut shards = Vec::new();
2008 while iter.valid() {
2009 let Some(key_bytes) = iter.key() else { break };
2010
2011 let current_prefix = key_bytes.get(..PREFIX_LEN);
2013 if current_prefix != Some(target_prefix) {
2014 break;
2015 }
2016
2017 let key = StorageShardedKey::decode(key_bytes)
2019 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2020
2021 let Some(value_bytes) = iter.value() else { break };
2022 let value = BlockNumberList::decompress(value_bytes)
2023 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2024
2025 shards.push((key, value));
2026 iter.next();
2027 }
2028
2029 match self.prune_history_shards_inner(
2031 shards,
2032 *to_block,
2033 |key| key.sharded_key.highest_block_number,
2034 |key| key.sharded_key.highest_block_number == u64::MAX,
2035 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2036 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2037 || StorageShardedKey::last(*address, *storage_key),
2038 )? {
2039 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2040 PruneShardOutcome::Updated => outcomes.updated += 1,
2041 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2042 }
2043 }
2044
2045 Ok(outcomes)
2046 }
2047
2048 pub fn unwind_storage_history_to(
2057 &mut self,
2058 address: Address,
2059 storage_key: B256,
2060 keep_to: BlockNumber,
2061 ) -> ProviderResult<()> {
2062 let shards = self.provider.storage_history_shards(address, storage_key)?;
2063 if shards.is_empty() {
2064 return Ok(());
2065 }
2066
2067 let boundary_idx = shards.iter().position(|(key, _)| {
2070 key.sharded_key.highest_block_number == u64::MAX ||
2071 key.sharded_key.highest_block_number > keep_to
2072 });
2073
2074 let Some(boundary_idx) = boundary_idx else {
2076 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2077 if last_key.sharded_key.highest_block_number != u64::MAX {
2078 self.delete::<tables::StoragesHistory>(last_key.clone())?;
2079 self.put::<tables::StoragesHistory>(
2080 StorageShardedKey::last(address, storage_key),
2081 last_value,
2082 )?;
2083 }
2084 return Ok(());
2085 };
2086
2087 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2089 self.delete::<tables::StoragesHistory>(key.clone())?;
2090 }
2091
2092 let (boundary_key, boundary_list) = &shards[boundary_idx];
2094
2095 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2097
2098 let new_last =
2100 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2101
2102 if new_last.is_empty() {
2103 if boundary_idx == 0 {
2106 return Ok(());
2108 }
2109
2110 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2111 if prev_key.sharded_key.highest_block_number != u64::MAX {
2112 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2113 self.put::<tables::StoragesHistory>(
2114 StorageShardedKey::last(address, storage_key),
2115 prev_value,
2116 )?;
2117 }
2118 return Ok(());
2119 }
2120
2121 self.put::<tables::StoragesHistory>(
2122 StorageShardedKey::last(address, storage_key),
2123 &new_last,
2124 )?;
2125
2126 Ok(())
2127 }
2128
2129 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2133 let shards = self.provider.account_history_shards(address)?;
2134 for (key, _) in shards {
2135 self.delete::<tables::AccountsHistory>(key)?;
2136 }
2137 Ok(())
2138 }
2139
2140 pub fn clear_storage_history(
2144 &mut self,
2145 address: Address,
2146 storage_key: B256,
2147 ) -> ProviderResult<()> {
2148 let shards = self.provider.storage_history_shards(address, storage_key)?;
2149 for (key, _) in shards {
2150 self.delete::<tables::StoragesHistory>(key)?;
2151 }
2152 Ok(())
2153 }
2154}
2155
2156pub struct RocksTx<'db> {
2166 inner: Transaction<'db, OptimisticTransactionDB>,
2167 provider: &'db RocksDBProvider,
2168}
2169
2170impl fmt::Debug for RocksTx<'_> {
2171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2172 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2173 }
2174}
2175
2176impl<'db> RocksTx<'db> {
2177 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2179 let encoded_key = key.encode();
2180 self.get_encoded::<T>(&encoded_key)
2181 }
2182
2183 pub fn get_encoded<T: Table>(
2185 &self,
2186 key: &<T::Key as Encode>::Encoded,
2187 ) -> ProviderResult<Option<T::Value>> {
2188 let cf = self.provider.get_cf_handle::<T>()?;
2189 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2190 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2191 message: e.to_string().into(),
2192 code: -1,
2193 }))
2194 })?;
2195
2196 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2197 }
2198
2199 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2201 let encoded_key = key.encode();
2202 self.put_encoded::<T>(&encoded_key, value)
2203 }
2204
2205 pub fn put_encoded<T: Table>(
2207 &self,
2208 key: &<T::Key as Encode>::Encoded,
2209 value: &T::Value,
2210 ) -> ProviderResult<()> {
2211 let cf = self.provider.get_cf_handle::<T>()?;
2212 let mut buf = Vec::new();
2213 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2214
2215 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2216 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2217 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2218 operation: DatabaseWriteOperation::PutUpsert,
2219 table_name: T::NAME,
2220 key: key.as_ref().to_vec(),
2221 })))
2222 })
2223 }
2224
2225 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2227 let cf = self.provider.get_cf_handle::<T>()?;
2228 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2229 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2230 message: e.to_string().into(),
2231 code: -1,
2232 }))
2233 })
2234 }
2235
2236 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2240 let cf = self.provider.get_cf_handle::<T>()?;
2241 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2242 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2243 }
2244
2245 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2247 let cf = self.provider.get_cf_handle::<T>()?;
2248 let encoded_key = key.encode();
2249 let iter = self
2250 .inner
2251 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2252 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2253 }
2254
2255 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2257 pub fn commit(self) -> ProviderResult<()> {
2258 self.inner.commit().map_err(|e| {
2259 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2260 message: e.to_string().into(),
2261 code: -1,
2262 }))
2263 })
2264 }
2265
2266 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2268 pub fn rollback(self) -> ProviderResult<()> {
2269 self.inner.rollback().map_err(|e| {
2270 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2271 })
2272 }
2273
2274 pub fn account_history_info(
2280 &self,
2281 address: Address,
2282 block_number: BlockNumber,
2283 lowest_available_block_number: Option<BlockNumber>,
2284 ) -> ProviderResult<HistoryInfo> {
2285 let key = ShardedKey::new(address, block_number);
2286 self.history_info::<tables::AccountsHistory>(
2287 key.encode().as_ref(),
2288 block_number,
2289 lowest_available_block_number,
2290 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
2291 |prev_bytes| {
2292 <ShardedKey<Address> as Decode>::decode(prev_bytes)
2293 .map(|k| k.key == address)
2294 .unwrap_or(false)
2295 },
2296 )
2297 }
2298
2299 pub fn storage_history_info(
2305 &self,
2306 address: Address,
2307 storage_key: B256,
2308 block_number: BlockNumber,
2309 lowest_available_block_number: Option<BlockNumber>,
2310 ) -> ProviderResult<HistoryInfo> {
2311 let key = StorageShardedKey::new(address, storage_key, block_number);
2312 self.history_info::<tables::StoragesHistory>(
2313 key.encode().as_ref(),
2314 block_number,
2315 lowest_available_block_number,
2316 |key_bytes| {
2317 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
2318 Ok(k.address == address && k.sharded_key.key == storage_key)
2319 },
2320 |prev_bytes| {
2321 <StorageShardedKey as Decode>::decode(prev_bytes)
2322 .map(|k| k.address == address && k.sharded_key.key == storage_key)
2323 .unwrap_or(false)
2324 },
2325 )
2326 }
2327
2328 fn history_info<T>(
2333 &self,
2334 encoded_key: &[u8],
2335 block_number: BlockNumber,
2336 lowest_available_block_number: Option<BlockNumber>,
2337 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
2338 prev_key_matches: impl Fn(&[u8]) -> bool,
2339 ) -> ProviderResult<HistoryInfo>
2340 where
2341 T: Table<Value = BlockNumberList>,
2342 {
2343 let is_maybe_pruned = lowest_available_block_number.is_some();
2345 let fallback = || {
2346 Ok(if is_maybe_pruned {
2347 HistoryInfo::MaybeInPlainState
2348 } else {
2349 HistoryInfo::NotYetWritten
2350 })
2351 };
2352
2353 let cf = self.provider.0.cf_handle_rw(T::NAME)?;
2354
2355 let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
2357 self.inner.raw_iterator_cf(&cf);
2358
2359 iter.seek(encoded_key);
2361 Self::raw_iter_status_ok(&iter)?;
2362
2363 if !iter.valid() {
2364 return fallback();
2370 }
2371
2372 let Some(key_bytes) = iter.key() else {
2374 return fallback();
2375 };
2376 if !key_matches(key_bytes)? {
2377 return fallback();
2379 }
2380
2381 let Some(value_bytes) = iter.value() else {
2383 return fallback();
2384 };
2385 let chunk = BlockNumberList::decompress(value_bytes)?;
2386 let (rank, found_block) = compute_history_rank(&chunk, block_number);
2387
2388 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
2392 iter.prev();
2393 Self::raw_iter_status_ok(&iter)?;
2394 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
2395 !has_prev
2396 } else {
2397 false
2398 };
2399
2400 Ok(HistoryInfo::from_lookup(
2401 found_block,
2402 is_before_first_write,
2403 lowest_available_block_number,
2404 ))
2405 }
2406
2407 fn raw_iter_status_ok(
2409 iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
2410 ) -> ProviderResult<()> {
2411 iter.status().map_err(|e| {
2412 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2413 message: e.to_string().into(),
2414 code: -1,
2415 }))
2416 })
2417 }
2418}
2419
2420enum RocksDBIterEnum<'db> {
2422 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2424 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2426}
2427
2428impl Iterator for RocksDBIterEnum<'_> {
2429 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2430
2431 fn next(&mut self) -> Option<Self::Item> {
2432 match self {
2433 Self::ReadWrite(iter) => iter.next(),
2434 Self::ReadOnly(iter) => iter.next(),
2435 }
2436 }
2437}
2438
2439enum RocksDBRawIterEnum<'db> {
2444 ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2446 ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2448}
2449
2450impl RocksDBRawIterEnum<'_> {
2451 fn seek(&mut self, key: impl AsRef<[u8]>) {
2453 match self {
2454 Self::ReadWrite(iter) => iter.seek(key),
2455 Self::ReadOnly(iter) => iter.seek(key),
2456 }
2457 }
2458
2459 fn valid(&self) -> bool {
2461 match self {
2462 Self::ReadWrite(iter) => iter.valid(),
2463 Self::ReadOnly(iter) => iter.valid(),
2464 }
2465 }
2466
2467 fn key(&self) -> Option<&[u8]> {
2469 match self {
2470 Self::ReadWrite(iter) => iter.key(),
2471 Self::ReadOnly(iter) => iter.key(),
2472 }
2473 }
2474
2475 fn value(&self) -> Option<&[u8]> {
2477 match self {
2478 Self::ReadWrite(iter) => iter.value(),
2479 Self::ReadOnly(iter) => iter.value(),
2480 }
2481 }
2482
2483 fn next(&mut self) {
2485 match self {
2486 Self::ReadWrite(iter) => iter.next(),
2487 Self::ReadOnly(iter) => iter.next(),
2488 }
2489 }
2490
2491 fn status(&self) -> Result<(), rocksdb::Error> {
2493 match self {
2494 Self::ReadWrite(iter) => iter.status(),
2495 Self::ReadOnly(iter) => iter.status(),
2496 }
2497 }
2498}
2499
2500pub struct RocksDBIter<'db, T: Table> {
2504 inner: RocksDBIterEnum<'db>,
2505 _marker: std::marker::PhantomData<T>,
2506}
2507
2508impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2510 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2511 }
2512}
2513
2514impl<T: Table> Iterator for RocksDBIter<'_, T> {
2515 type Item = ProviderResult<(T::Key, T::Value)>;
2516
2517 fn next(&mut self) -> Option<Self::Item> {
2518 Some(decode_iter_item::<T>(self.inner.next()?))
2519 }
2520}
2521
2522pub struct RocksDBRawIter<'db> {
2526 inner: RocksDBIterEnum<'db>,
2527}
2528
2529impl fmt::Debug for RocksDBRawIter<'_> {
2530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2531 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2532 }
2533}
2534
2535impl Iterator for RocksDBRawIter<'_> {
2536 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2537
2538 fn next(&mut self) -> Option<Self::Item> {
2539 match self.inner.next()? {
2540 Ok(kv) => Some(Ok(kv)),
2541 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2542 message: e.to_string().into(),
2543 code: -1,
2544 })))),
2545 }
2546 }
2547}
2548
2549pub struct RocksTxIter<'tx, T: Table> {
2553 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2554 _marker: std::marker::PhantomData<T>,
2555}
2556
2557impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2558 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2559 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2560 }
2561}
2562
2563impl<T: Table> Iterator for RocksTxIter<'_, T> {
2564 type Item = ProviderResult<(T::Key, T::Value)>;
2565
2566 fn next(&mut self) -> Option<Self::Item> {
2567 Some(decode_iter_item::<T>(self.inner.next()?))
2568 }
2569}
2570
2571fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2576 let (key_bytes, value_bytes) = result.map_err(|e| {
2577 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2578 message: e.to_string().into(),
2579 code: -1,
2580 }))
2581 })?;
2582
2583 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2584 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2585
2586 let value = T::Value::decompress(&value_bytes)
2587 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2588
2589 Ok((key, value))
2590}
2591
2592const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2594 match level {
2595 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2596 LogLevel::Error => rocksdb::LogLevel::Error,
2597 LogLevel::Warn => rocksdb::LogLevel::Warn,
2598 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2599 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2600 }
2601}
2602
2603#[cfg(test)]
2604mod tests {
2605 use super::*;
2606 use crate::providers::HistoryInfo;
2607 use alloy_primitives::{Address, TxHash, B256};
2608 use reth_db_api::{
2609 models::{
2610 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2611 storage_sharded_key::StorageShardedKey,
2612 IntegerList,
2613 },
2614 table::Table,
2615 tables,
2616 };
2617 use tempfile::TempDir;
2618
2619 #[test]
2620 fn test_with_default_tables_registers_required_column_families() {
2621 let temp_dir = TempDir::new().unwrap();
2622
2623 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2625
2626 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2628 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2629 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2630
2631 let key = ShardedKey::new(Address::ZERO, 100);
2633 let value = IntegerList::default();
2634 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2635 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2636
2637 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2639 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2640 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2641 }
2642
2643 #[derive(Debug)]
2644 struct TestTable;
2645
2646 impl Table for TestTable {
2647 const NAME: &'static str = "TestTable";
2648 const DUPSORT: bool = false;
2649 type Key = u64;
2650 type Value = Vec<u8>;
2651 }
2652
2653 #[test]
2654 fn test_basic_operations() {
2655 let temp_dir = TempDir::new().unwrap();
2656
2657 let provider = RocksDBBuilder::new(temp_dir.path())
2658 .with_table::<TestTable>() .build()
2660 .unwrap();
2661
2662 let key = 42u64;
2663 let value = b"test_value".to_vec();
2664
2665 provider.put::<TestTable>(key, &value).unwrap();
2667
2668 let result = provider.get::<TestTable>(key).unwrap();
2670 assert_eq!(result, Some(value));
2671
2672 provider.delete::<TestTable>(key).unwrap();
2674
2675 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2677 }
2678
2679 #[test]
2680 fn test_batch_operations() {
2681 let temp_dir = TempDir::new().unwrap();
2682 let provider =
2683 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2684
2685 provider
2687 .write_batch(|batch| {
2688 for i in 0..10u64 {
2689 let value = format!("value_{i}").into_bytes();
2690 batch.put::<TestTable>(i, &value)?;
2691 }
2692 Ok(())
2693 })
2694 .unwrap();
2695
2696 for i in 0..10u64 {
2698 let value = format!("value_{i}").into_bytes();
2699 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2700 }
2701
2702 provider
2704 .write_batch(|batch| {
2705 for i in 0..10u64 {
2706 batch.delete::<TestTable>(i)?;
2707 }
2708 Ok(())
2709 })
2710 .unwrap();
2711
2712 for i in 0..10u64 {
2714 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2715 }
2716 }
2717
2718 #[test]
2719 fn test_with_real_table() {
2720 let temp_dir = TempDir::new().unwrap();
2721 let provider = RocksDBBuilder::new(temp_dir.path())
2722 .with_table::<tables::TransactionHashNumbers>()
2723 .with_metrics()
2724 .build()
2725 .unwrap();
2726
2727 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2728
2729 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2731 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2732
2733 provider
2735 .write_batch(|batch| {
2736 for i in 0..10u64 {
2737 let hash = TxHash::from(B256::from([i as u8; 32]));
2738 let value = i * 100;
2739 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2740 }
2741 Ok(())
2742 })
2743 .unwrap();
2744
2745 for i in 0..10u64 {
2747 let hash = TxHash::from(B256::from([i as u8; 32]));
2748 assert_eq!(
2749 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2750 Some(i * 100)
2751 );
2752 }
2753 }
2754 #[test]
2755 fn test_statistics_enabled() {
2756 let temp_dir = TempDir::new().unwrap();
2757 let provider = RocksDBBuilder::new(temp_dir.path())
2759 .with_table::<TestTable>()
2760 .with_statistics()
2761 .build()
2762 .unwrap();
2763
2764 for i in 0..10 {
2766 let value = vec![i as u8];
2767 provider.put::<TestTable>(i, &value).unwrap();
2768 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2770 }
2771 }
2772
2773 #[test]
2774 fn test_data_persistence() {
2775 let temp_dir = TempDir::new().unwrap();
2776 let provider =
2777 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2778
2779 let value = vec![42u8; 1000];
2781 for i in 0..100 {
2782 provider.put::<TestTable>(i, &value).unwrap();
2783 }
2784
2785 for i in 0..100 {
2787 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2788 }
2789 }
2790
2791 #[test]
2792 fn test_transaction_read_your_writes() {
2793 let temp_dir = TempDir::new().unwrap();
2794 let provider =
2795 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2796
2797 let tx = provider.tx();
2799
2800 let key = 42u64;
2802 let value = b"test_value".to_vec();
2803 tx.put::<TestTable>(key, &value).unwrap();
2804
2805 let result = tx.get::<TestTable>(key).unwrap();
2807 assert_eq!(
2808 result,
2809 Some(value.clone()),
2810 "Transaction should see its own uncommitted writes"
2811 );
2812
2813 let provider_result = provider.get::<TestTable>(key).unwrap();
2815 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2816
2817 tx.commit().unwrap();
2819
2820 let committed_result = provider.get::<TestTable>(key).unwrap();
2822 assert_eq!(committed_result, Some(value), "Committed data should be visible");
2823 }
2824
2825 #[test]
2826 fn test_transaction_rollback() {
2827 let temp_dir = TempDir::new().unwrap();
2828 let provider =
2829 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2830
2831 let key = 100u64;
2833 let initial_value = b"initial".to_vec();
2834 provider.put::<TestTable>(key, &initial_value).unwrap();
2835
2836 let tx = provider.tx();
2838 let new_value = b"modified".to_vec();
2839 tx.put::<TestTable>(key, &new_value).unwrap();
2840
2841 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2843
2844 tx.rollback().unwrap();
2846
2847 let result = provider.get::<TestTable>(key).unwrap();
2849 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2850 }
2851
2852 #[test]
2853 fn test_transaction_iterator() {
2854 let temp_dir = TempDir::new().unwrap();
2855 let provider =
2856 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2857
2858 let tx = provider.tx();
2860
2861 for i in 0..5u64 {
2863 let value = format!("value_{i}").into_bytes();
2864 tx.put::<TestTable>(i, &value).unwrap();
2865 }
2866
2867 let mut count = 0;
2869 for result in tx.iter::<TestTable>().unwrap() {
2870 let (key, value) = result.unwrap();
2871 assert_eq!(value, format!("value_{key}").into_bytes());
2872 count += 1;
2873 }
2874 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2875
2876 tx.commit().unwrap();
2878 }
2879
2880 #[test]
2881 fn test_batch_manual_commit() {
2882 let temp_dir = TempDir::new().unwrap();
2883 let provider =
2884 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2885
2886 let mut batch = provider.batch();
2888
2889 for i in 0..10u64 {
2891 let value = format!("batch_value_{i}").into_bytes();
2892 batch.put::<TestTable>(i, &value).unwrap();
2893 }
2894
2895 assert_eq!(batch.len(), 10);
2897 assert!(!batch.is_empty());
2898
2899 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2901
2902 batch.commit().unwrap();
2904
2905 for i in 0..10u64 {
2907 let value = format!("batch_value_{i}").into_bytes();
2908 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2909 }
2910 }
2911
2912 #[test]
2913 fn test_first_and_last_entry() {
2914 let temp_dir = TempDir::new().unwrap();
2915 let provider =
2916 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2917
2918 assert_eq!(provider.first::<TestTable>().unwrap(), None);
2920 assert_eq!(provider.last::<TestTable>().unwrap(), None);
2921
2922 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2924 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2925 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2926
2927 let first = provider.first::<TestTable>().unwrap();
2929 assert_eq!(first, Some((5, b"value_5".to_vec())));
2930
2931 let last = provider.last::<TestTable>().unwrap();
2933 assert_eq!(last, Some((20, b"value_20".to_vec())));
2934 }
2935
2936 #[test]
2940 fn test_account_history_info_pruned_before_first_entry() {
2941 let temp_dir = TempDir::new().unwrap();
2942 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2943
2944 let address = Address::from([0x42; 20]);
2945
2946 let chunk = IntegerList::new([100, 200, 300]).unwrap();
2948 let shard_key = ShardedKey::new(address, u64::MAX);
2949 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2950
2951 let tx = provider.tx();
2952
2953 let result = tx.account_history_info(address, 50, Some(100)).unwrap();
2958 assert_eq!(result, HistoryInfo::InChangeset(100));
2959
2960 tx.rollback().unwrap();
2961 }
2962
2963 #[test]
2964 fn test_account_history_shard_split_at_boundary() {
2965 let temp_dir = TempDir::new().unwrap();
2966 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2967
2968 let address = Address::from([0x42; 20]);
2969 let limit = NUM_OF_INDICES_IN_SHARD;
2970
2971 let indices: Vec<u64> = (0..=(limit as u64)).collect();
2973 let mut batch = provider.batch();
2974 batch.append_account_history_shard(address, indices).unwrap();
2975 batch.commit().unwrap();
2976
2977 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
2979 let sentinel_key = ShardedKey::new(address, u64::MAX);
2980
2981 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
2982 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
2983
2984 assert!(completed_shard.is_some(), "completed shard should exist");
2985 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2986
2987 let completed_shard = completed_shard.unwrap();
2988 let sentinel_shard = sentinel_shard.unwrap();
2989
2990 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2991 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2992 }
2993
2994 #[test]
2995 fn test_account_history_multiple_shard_splits() {
2996 let temp_dir = TempDir::new().unwrap();
2997 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2998
2999 let address = Address::from([0x43; 20]);
3000 let limit = NUM_OF_INDICES_IN_SHARD;
3001
3002 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3004 let mut batch = provider.batch();
3005 batch.append_account_history_shard(address, first_batch_indices).unwrap();
3006 batch.commit().unwrap();
3007
3008 let sentinel_key = ShardedKey::new(address, u64::MAX);
3010 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3011 assert!(shard.is_some());
3012 assert_eq!(shard.unwrap().len(), limit as u64);
3013
3014 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3016 let mut batch = provider.batch();
3017 batch.append_account_history_shard(address, second_batch_indices).unwrap();
3018 batch.commit().unwrap();
3019
3020 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3022 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3023
3024 assert!(
3025 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3026 "first completed shard should exist"
3027 );
3028 assert!(
3029 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3030 "second completed shard should exist"
3031 );
3032 assert!(
3033 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3034 "sentinel shard should exist"
3035 );
3036 }
3037
3038 #[test]
3039 fn test_storage_history_shard_split_at_boundary() {
3040 let temp_dir = TempDir::new().unwrap();
3041 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3042
3043 let address = Address::from([0x44; 20]);
3044 let slot = B256::from([0x55; 32]);
3045 let limit = NUM_OF_INDICES_IN_SHARD;
3046
3047 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3049 let mut batch = provider.batch();
3050 batch.append_storage_history_shard(address, slot, indices).unwrap();
3051 batch.commit().unwrap();
3052
3053 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3055 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3056
3057 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3058 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3059
3060 assert!(completed_shard.is_some(), "completed shard should exist");
3061 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3062
3063 let completed_shard = completed_shard.unwrap();
3064 let sentinel_shard = sentinel_shard.unwrap();
3065
3066 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3067 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3068 }
3069
3070 #[test]
3071 fn test_storage_history_multiple_shard_splits() {
3072 let temp_dir = TempDir::new().unwrap();
3073 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3074
3075 let address = Address::from([0x46; 20]);
3076 let slot = B256::from([0x57; 32]);
3077 let limit = NUM_OF_INDICES_IN_SHARD;
3078
3079 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3081 let mut batch = provider.batch();
3082 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3083 batch.commit().unwrap();
3084
3085 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3087 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3088 assert!(shard.is_some());
3089 assert_eq!(shard.unwrap().len(), limit as u64);
3090
3091 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3093 let mut batch = provider.batch();
3094 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3095 batch.commit().unwrap();
3096
3097 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3099 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3100
3101 assert!(
3102 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3103 "first completed shard should exist"
3104 );
3105 assert!(
3106 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3107 "second completed shard should exist"
3108 );
3109 assert!(
3110 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3111 "sentinel shard should exist"
3112 );
3113 }
3114
3115 #[test]
3116 fn test_clear_table() {
3117 let temp_dir = TempDir::new().unwrap();
3118 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3119
3120 let address = Address::from([0x42; 20]);
3121 let key = ShardedKey::new(address, u64::MAX);
3122 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3123
3124 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3125 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3126
3127 provider.clear::<tables::AccountsHistory>().unwrap();
3128
3129 assert!(
3130 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3131 "table should be empty after clear"
3132 );
3133 assert!(
3134 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3135 "first() should return None after clear"
3136 );
3137 }
3138
3139 #[test]
3140 fn test_clear_empty_table() {
3141 let temp_dir = TempDir::new().unwrap();
3142 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3143
3144 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3145
3146 provider.clear::<tables::AccountsHistory>().unwrap();
3147
3148 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3149 }
3150
3151 #[test]
3152 fn test_unwind_account_history_to_basic() {
3153 let temp_dir = TempDir::new().unwrap();
3154 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3155
3156 let address = Address::from([0x42; 20]);
3157
3158 let mut batch = provider.batch();
3160 batch.append_account_history_shard(address, 0..=10).unwrap();
3161 batch.commit().unwrap();
3162
3163 let key = ShardedKey::new(address, u64::MAX);
3165 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3166 assert!(result.is_some());
3167 let blocks: Vec<u64> = result.unwrap().iter().collect();
3168 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3169
3170 let mut batch = provider.batch();
3172 batch.unwind_account_history_to(address, 5).unwrap();
3173 batch.commit().unwrap();
3174
3175 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3177 assert!(result.is_some());
3178 let blocks: Vec<u64> = result.unwrap().iter().collect();
3179 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3180 }
3181
3182 #[test]
3183 fn test_unwind_account_history_to_removes_all() {
3184 let temp_dir = TempDir::new().unwrap();
3185 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3186
3187 let address = Address::from([0x42; 20]);
3188
3189 let mut batch = provider.batch();
3191 batch.append_account_history_shard(address, 5..=10).unwrap();
3192 batch.commit().unwrap();
3193
3194 let mut batch = provider.batch();
3196 batch.unwind_account_history_to(address, 4).unwrap();
3197 batch.commit().unwrap();
3198
3199 let key = ShardedKey::new(address, u64::MAX);
3201 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3202 assert!(result.is_none(), "Should have no data after full unwind");
3203 }
3204
3205 #[test]
3206 fn test_unwind_account_history_to_no_op() {
3207 let temp_dir = TempDir::new().unwrap();
3208 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3209
3210 let address = Address::from([0x42; 20]);
3211
3212 let mut batch = provider.batch();
3214 batch.append_account_history_shard(address, 0..=5).unwrap();
3215 batch.commit().unwrap();
3216
3217 let mut batch = provider.batch();
3219 batch.unwind_account_history_to(address, 10).unwrap();
3220 batch.commit().unwrap();
3221
3222 let key = ShardedKey::new(address, u64::MAX);
3224 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3225 assert!(result.is_some());
3226 let blocks: Vec<u64> = result.unwrap().iter().collect();
3227 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3228 }
3229
3230 #[test]
3231 fn test_unwind_account_history_to_block_zero() {
3232 let temp_dir = TempDir::new().unwrap();
3233 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3234
3235 let address = Address::from([0x42; 20]);
3236
3237 let mut batch = provider.batch();
3239 batch.append_account_history_shard(address, 0..=5).unwrap();
3240 batch.commit().unwrap();
3241
3242 let mut batch = provider.batch();
3245 batch.unwind_account_history_to(address, 0).unwrap();
3246 batch.commit().unwrap();
3247
3248 let key = ShardedKey::new(address, u64::MAX);
3250 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3251 assert!(result.is_some());
3252 let blocks: Vec<u64> = result.unwrap().iter().collect();
3253 assert_eq!(blocks, vec![0]);
3254 }
3255
3256 #[test]
3257 fn test_unwind_account_history_to_multi_shard() {
3258 let temp_dir = TempDir::new().unwrap();
3259 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3260
3261 let address = Address::from([0x42; 20]);
3262
3263 let mut batch = provider.batch();
3266
3267 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3269 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3270
3271 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3273 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3274
3275 batch.commit().unwrap();
3276
3277 let shards = provider.account_history_shards(address).unwrap();
3279 assert_eq!(shards.len(), 2);
3280
3281 let mut batch = provider.batch();
3283 batch.unwind_account_history_to(address, 75).unwrap();
3284 batch.commit().unwrap();
3285
3286 let shards = provider.account_history_shards(address).unwrap();
3288 assert_eq!(shards.len(), 2);
3289
3290 assert_eq!(shards[0].0.highest_block_number, 50);
3292 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3293
3294 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3296 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3297 }
3298
3299 #[test]
3300 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3301 let temp_dir = TempDir::new().unwrap();
3302 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3303
3304 let address = Address::from([0x42; 20]);
3305
3306 let mut batch = provider.batch();
3308
3309 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3311 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3312
3313 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3315 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3316
3317 batch.commit().unwrap();
3318
3319 let mut batch = provider.batch();
3321 batch.unwind_account_history_to(address, 60).unwrap();
3322 batch.commit().unwrap();
3323
3324 let shards = provider.account_history_shards(address).unwrap();
3326 assert_eq!(shards.len(), 1);
3327 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3328 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3329 }
3330
3331 #[test]
3332 fn test_account_history_shards_iterator() {
3333 let temp_dir = TempDir::new().unwrap();
3334 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3335
3336 let address = Address::from([0x42; 20]);
3337 let other_address = Address::from([0x43; 20]);
3338
3339 let mut batch = provider.batch();
3341 batch.append_account_history_shard(address, 0..=5).unwrap();
3342 batch.append_account_history_shard(other_address, 10..=15).unwrap();
3343 batch.commit().unwrap();
3344
3345 let shards = provider.account_history_shards(address).unwrap();
3347 assert_eq!(shards.len(), 1);
3348 assert_eq!(shards[0].0.key, address);
3349
3350 let shards = provider.account_history_shards(other_address).unwrap();
3352 assert_eq!(shards.len(), 1);
3353 assert_eq!(shards[0].0.key, other_address);
3354
3355 let non_existent = Address::from([0x99; 20]);
3357 let shards = provider.account_history_shards(non_existent).unwrap();
3358 assert!(shards.is_empty());
3359 }
3360
3361 #[test]
3362 fn test_clear_account_history() {
3363 let temp_dir = TempDir::new().unwrap();
3364 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3365
3366 let address = Address::from([0x42; 20]);
3367
3368 let mut batch = provider.batch();
3370 batch.append_account_history_shard(address, 0..=10).unwrap();
3371 batch.commit().unwrap();
3372
3373 let mut batch = provider.batch();
3375 batch.clear_account_history(address).unwrap();
3376 batch.commit().unwrap();
3377
3378 let shards = provider.account_history_shards(address).unwrap();
3380 assert!(shards.is_empty(), "All shards should be deleted");
3381 }
3382
3383 #[test]
3384 fn test_unwind_non_sentinel_boundary() {
3385 let temp_dir = TempDir::new().unwrap();
3386 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3387
3388 let address = Address::from([0x42; 20]);
3389
3390 let mut batch = provider.batch();
3392
3393 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3395 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3396
3397 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3399 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3400
3401 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3403 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3404
3405 batch.commit().unwrap();
3406
3407 let shards = provider.account_history_shards(address).unwrap();
3409 assert_eq!(shards.len(), 3);
3410
3411 let mut batch = provider.batch();
3413 batch.unwind_account_history_to(address, 75).unwrap();
3414 batch.commit().unwrap();
3415
3416 let shards = provider.account_history_shards(address).unwrap();
3418 assert_eq!(shards.len(), 2);
3419
3420 assert_eq!(shards[0].0.highest_block_number, 50);
3422 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3423
3424 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3426 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3427 }
3428
3429 #[test]
3430 fn test_batch_auto_commit_on_threshold() {
3431 let temp_dir = TempDir::new().unwrap();
3432 let provider =
3433 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3434
3435 let mut batch = RocksDBBatch {
3437 provider: &provider,
3438 inner: WriteBatchWithTransaction::<true>::default(),
3439 buf: Vec::new(),
3440 auto_commit_threshold: Some(1024), };
3442
3443 for i in 0..100u64 {
3446 let value = format!("value_{i:04}").into_bytes();
3447 batch.put::<TestTable>(i, &value).unwrap();
3448 }
3449
3450 let first_visible = provider.get::<TestTable>(0).unwrap();
3453 assert!(first_visible.is_some(), "Auto-committed data should be visible");
3454
3455 batch.commit().unwrap();
3457
3458 for i in 0..100u64 {
3460 let value = format!("value_{i:04}").into_bytes();
3461 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3462 }
3463 }
3464
3465 struct AccountPruneCase {
3469 name: &'static str,
3470 initial_shards: &'static [(u64, &'static [u64])],
3471 prune_to: u64,
3472 expected_outcome: PruneShardOutcome,
3473 expected_shards: &'static [(u64, &'static [u64])],
3474 }
3475
3476 struct StoragePruneCase {
3478 name: &'static str,
3479 initial_shards: &'static [(u64, &'static [u64])],
3480 prune_to: u64,
3481 expected_outcome: PruneShardOutcome,
3482 expected_shards: &'static [(u64, &'static [u64])],
3483 }
3484
3485 #[test]
3486 fn test_prune_account_history_cases() {
3487 const MAX: u64 = u64::MAX;
3488 const CASES: &[AccountPruneCase] = &[
3489 AccountPruneCase {
3490 name: "single_shard_truncate",
3491 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3492 prune_to: 25,
3493 expected_outcome: PruneShardOutcome::Updated,
3494 expected_shards: &[(MAX, &[30, 40])],
3495 },
3496 AccountPruneCase {
3497 name: "single_shard_delete_all",
3498 initial_shards: &[(MAX, &[10, 20])],
3499 prune_to: 20,
3500 expected_outcome: PruneShardOutcome::Deleted,
3501 expected_shards: &[],
3502 },
3503 AccountPruneCase {
3504 name: "single_shard_noop",
3505 initial_shards: &[(MAX, &[10, 20])],
3506 prune_to: 5,
3507 expected_outcome: PruneShardOutcome::Unchanged,
3508 expected_shards: &[(MAX, &[10, 20])],
3509 },
3510 AccountPruneCase {
3511 name: "no_shards",
3512 initial_shards: &[],
3513 prune_to: 100,
3514 expected_outcome: PruneShardOutcome::Unchanged,
3515 expected_shards: &[],
3516 },
3517 AccountPruneCase {
3518 name: "multi_shard_truncate_first",
3519 initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3520 prune_to: 25,
3521 expected_outcome: PruneShardOutcome::Updated,
3522 expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3523 },
3524 AccountPruneCase {
3525 name: "delete_first_shard_sentinel_unchanged",
3526 initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3527 prune_to: 20,
3528 expected_outcome: PruneShardOutcome::Deleted,
3529 expected_shards: &[(MAX, &[30, 40])],
3530 },
3531 AccountPruneCase {
3532 name: "multi_shard_delete_all_but_last",
3533 initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3534 prune_to: 22,
3535 expected_outcome: PruneShardOutcome::Deleted,
3536 expected_shards: &[(MAX, &[25, 30])],
3537 },
3538 AccountPruneCase {
3539 name: "mid_shard_preserves_key",
3540 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3541 prune_to: 25,
3542 expected_outcome: PruneShardOutcome::Updated,
3543 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3544 },
3545 AccountPruneCase {
3547 name: "equiv_delete_early_shards_keep_sentinel",
3548 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3549 prune_to: 55,
3550 expected_outcome: PruneShardOutcome::Deleted,
3551 expected_shards: &[(MAX, &[60, 70])],
3552 },
3553 AccountPruneCase {
3554 name: "equiv_sentinel_becomes_empty_with_prev",
3555 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3556 prune_to: 40,
3557 expected_outcome: PruneShardOutcome::Deleted,
3558 expected_shards: &[(MAX, &[50])],
3559 },
3560 AccountPruneCase {
3561 name: "equiv_all_shards_become_empty",
3562 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3563 prune_to: 51,
3564 expected_outcome: PruneShardOutcome::Deleted,
3565 expected_shards: &[],
3566 },
3567 AccountPruneCase {
3568 name: "equiv_non_sentinel_last_shard_promoted",
3569 initial_shards: &[(100, &[50, 75, 100])],
3570 prune_to: 60,
3571 expected_outcome: PruneShardOutcome::Updated,
3572 expected_shards: &[(MAX, &[75, 100])],
3573 },
3574 AccountPruneCase {
3575 name: "equiv_filter_within_shard",
3576 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3577 prune_to: 25,
3578 expected_outcome: PruneShardOutcome::Updated,
3579 expected_shards: &[(MAX, &[30, 40])],
3580 },
3581 AccountPruneCase {
3582 name: "equiv_multi_shard_partial_delete",
3583 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3584 prune_to: 35,
3585 expected_outcome: PruneShardOutcome::Deleted,
3586 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3587 },
3588 ];
3589
3590 let address = Address::from([0x42; 20]);
3591
3592 for case in CASES {
3593 let temp_dir = TempDir::new().unwrap();
3594 let provider =
3595 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3596
3597 let mut batch = provider.batch();
3599 for (highest, blocks) in case.initial_shards {
3600 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3601 batch
3602 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3603 .unwrap();
3604 }
3605 batch.commit().unwrap();
3606
3607 let mut batch = provider.batch();
3609 let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3610 batch.commit().unwrap();
3611
3612 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3614
3615 let shards = provider.account_history_shards(address).unwrap();
3617 assert_eq!(
3618 shards.len(),
3619 case.expected_shards.len(),
3620 "case '{}': wrong shard count",
3621 case.name
3622 );
3623 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3624 shards.iter().zip(case.expected_shards.iter()).enumerate()
3625 {
3626 assert_eq!(
3627 key.highest_block_number, *exp_key,
3628 "case '{}': shard {} wrong key",
3629 case.name, i
3630 );
3631 assert_eq!(
3632 blocks.iter().collect::<Vec<_>>(),
3633 *exp_blocks,
3634 "case '{}': shard {} wrong blocks",
3635 case.name,
3636 i
3637 );
3638 }
3639 }
3640 }
3641
3642 #[test]
3643 fn test_prune_storage_history_cases() {
3644 const MAX: u64 = u64::MAX;
3645 const CASES: &[StoragePruneCase] = &[
3646 StoragePruneCase {
3647 name: "single_shard_truncate",
3648 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3649 prune_to: 25,
3650 expected_outcome: PruneShardOutcome::Updated,
3651 expected_shards: &[(MAX, &[30, 40])],
3652 },
3653 StoragePruneCase {
3654 name: "single_shard_delete_all",
3655 initial_shards: &[(MAX, &[10, 20])],
3656 prune_to: 20,
3657 expected_outcome: PruneShardOutcome::Deleted,
3658 expected_shards: &[],
3659 },
3660 StoragePruneCase {
3661 name: "noop",
3662 initial_shards: &[(MAX, &[10, 20])],
3663 prune_to: 5,
3664 expected_outcome: PruneShardOutcome::Unchanged,
3665 expected_shards: &[(MAX, &[10, 20])],
3666 },
3667 StoragePruneCase {
3668 name: "no_shards",
3669 initial_shards: &[],
3670 prune_to: 100,
3671 expected_outcome: PruneShardOutcome::Unchanged,
3672 expected_shards: &[],
3673 },
3674 StoragePruneCase {
3675 name: "mid_shard_preserves_key",
3676 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3677 prune_to: 25,
3678 expected_outcome: PruneShardOutcome::Updated,
3679 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3680 },
3681 StoragePruneCase {
3683 name: "equiv_sentinel_promotion",
3684 initial_shards: &[(100, &[50, 75, 100])],
3685 prune_to: 60,
3686 expected_outcome: PruneShardOutcome::Updated,
3687 expected_shards: &[(MAX, &[75, 100])],
3688 },
3689 StoragePruneCase {
3690 name: "equiv_delete_early_shards_keep_sentinel",
3691 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3692 prune_to: 55,
3693 expected_outcome: PruneShardOutcome::Deleted,
3694 expected_shards: &[(MAX, &[60, 70])],
3695 },
3696 StoragePruneCase {
3697 name: "equiv_sentinel_becomes_empty_with_prev",
3698 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3699 prune_to: 40,
3700 expected_outcome: PruneShardOutcome::Deleted,
3701 expected_shards: &[(MAX, &[50])],
3702 },
3703 StoragePruneCase {
3704 name: "equiv_all_shards_become_empty",
3705 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3706 prune_to: 51,
3707 expected_outcome: PruneShardOutcome::Deleted,
3708 expected_shards: &[],
3709 },
3710 StoragePruneCase {
3711 name: "equiv_filter_within_shard",
3712 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3713 prune_to: 25,
3714 expected_outcome: PruneShardOutcome::Updated,
3715 expected_shards: &[(MAX, &[30, 40])],
3716 },
3717 StoragePruneCase {
3718 name: "equiv_multi_shard_partial_delete",
3719 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3720 prune_to: 35,
3721 expected_outcome: PruneShardOutcome::Deleted,
3722 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3723 },
3724 ];
3725
3726 let address = Address::from([0x42; 20]);
3727 let storage_key = B256::from([0x01; 32]);
3728
3729 for case in CASES {
3730 let temp_dir = TempDir::new().unwrap();
3731 let provider =
3732 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3733
3734 let mut batch = provider.batch();
3736 for (highest, blocks) in case.initial_shards {
3737 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3738 let key = if *highest == MAX {
3739 StorageShardedKey::last(address, storage_key)
3740 } else {
3741 StorageShardedKey::new(address, storage_key, *highest)
3742 };
3743 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3744 }
3745 batch.commit().unwrap();
3746
3747 let mut batch = provider.batch();
3749 let outcome =
3750 batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3751 batch.commit().unwrap();
3752
3753 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3755
3756 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3758 assert_eq!(
3759 shards.len(),
3760 case.expected_shards.len(),
3761 "case '{}': wrong shard count",
3762 case.name
3763 );
3764 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3765 shards.iter().zip(case.expected_shards.iter()).enumerate()
3766 {
3767 assert_eq!(
3768 key.sharded_key.highest_block_number, *exp_key,
3769 "case '{}': shard {} wrong key",
3770 case.name, i
3771 );
3772 assert_eq!(
3773 blocks.iter().collect::<Vec<_>>(),
3774 *exp_blocks,
3775 "case '{}': shard {} wrong blocks",
3776 case.name,
3777 i
3778 );
3779 }
3780 }
3781 }
3782
3783 #[test]
3784 fn test_prune_storage_history_does_not_affect_other_slots() {
3785 let temp_dir = TempDir::new().unwrap();
3786 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3787
3788 let address = Address::from([0x42; 20]);
3789 let slot1 = B256::from([0x01; 32]);
3790 let slot2 = B256::from([0x02; 32]);
3791
3792 let mut batch = provider.batch();
3794 batch
3795 .put::<tables::StoragesHistory>(
3796 StorageShardedKey::last(address, slot1),
3797 &BlockNumberList::new_pre_sorted([10u64, 20]),
3798 )
3799 .unwrap();
3800 batch
3801 .put::<tables::StoragesHistory>(
3802 StorageShardedKey::last(address, slot2),
3803 &BlockNumberList::new_pre_sorted([30u64, 40]),
3804 )
3805 .unwrap();
3806 batch.commit().unwrap();
3807
3808 let mut batch = provider.batch();
3810 let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
3811 batch.commit().unwrap();
3812
3813 assert_eq!(outcome, PruneShardOutcome::Deleted);
3814
3815 let shards1 = provider.storage_history_shards(address, slot1).unwrap();
3817 assert!(shards1.is_empty());
3818
3819 let shards2 = provider.storage_history_shards(address, slot2).unwrap();
3821 assert_eq!(shards2.len(), 1);
3822 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
3823 }
3824
3825 #[test]
3826 fn test_prune_invariants() {
3827 let address = Address::from([0x42; 20]);
3829 let storage_key = B256::from([0x01; 32]);
3830
3831 #[allow(clippy::type_complexity)]
3833 let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
3834 (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
3836 (&[(100, &[50, 100])], 60),
3838 ];
3839
3840 for (initial_shards, prune_to) in invariant_cases {
3841 {
3843 let temp_dir = TempDir::new().unwrap();
3844 let provider =
3845 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3846
3847 let mut batch = provider.batch();
3848 for (highest, blocks) in *initial_shards {
3849 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3850 batch
3851 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3852 .unwrap();
3853 }
3854 batch.commit().unwrap();
3855
3856 let mut batch = provider.batch();
3857 batch.prune_account_history_to(address, *prune_to).unwrap();
3858 batch.commit().unwrap();
3859
3860 let shards = provider.account_history_shards(address).unwrap();
3861
3862 for (key, blocks) in &shards {
3864 assert!(
3865 !blocks.is_empty(),
3866 "Account: empty shard at key {}",
3867 key.highest_block_number
3868 );
3869 }
3870
3871 if !shards.is_empty() {
3873 let last = shards.last().unwrap();
3874 assert_eq!(
3875 last.0.highest_block_number,
3876 u64::MAX,
3877 "Account: last shard must be sentinel"
3878 );
3879 }
3880 }
3881
3882 {
3884 let temp_dir = TempDir::new().unwrap();
3885 let provider =
3886 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3887
3888 let mut batch = provider.batch();
3889 for (highest, blocks) in *initial_shards {
3890 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3891 let key = if *highest == u64::MAX {
3892 StorageShardedKey::last(address, storage_key)
3893 } else {
3894 StorageShardedKey::new(address, storage_key, *highest)
3895 };
3896 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3897 }
3898 batch.commit().unwrap();
3899
3900 let mut batch = provider.batch();
3901 batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
3902 batch.commit().unwrap();
3903
3904 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3905
3906 for (key, blocks) in &shards {
3908 assert!(
3909 !blocks.is_empty(),
3910 "Storage: empty shard at key {}",
3911 key.sharded_key.highest_block_number
3912 );
3913 }
3914
3915 if !shards.is_empty() {
3917 let last = shards.last().unwrap();
3918 assert_eq!(
3919 last.0.sharded_key.highest_block_number,
3920 u64::MAX,
3921 "Storage: last shard must be sentinel"
3922 );
3923 }
3924 }
3925 }
3926 }
3927
3928 #[test]
3929 fn test_prune_account_history_batch_multiple_sorted_targets() {
3930 let temp_dir = TempDir::new().unwrap();
3931 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3932
3933 let addr1 = Address::from([0x01; 20]);
3934 let addr2 = Address::from([0x02; 20]);
3935 let addr3 = Address::from([0x03; 20]);
3936
3937 let mut batch = provider.batch();
3939 batch
3940 .put::<tables::AccountsHistory>(
3941 ShardedKey::new(addr1, u64::MAX),
3942 &BlockNumberList::new_pre_sorted([10, 20, 30]),
3943 )
3944 .unwrap();
3945 batch
3946 .put::<tables::AccountsHistory>(
3947 ShardedKey::new(addr2, u64::MAX),
3948 &BlockNumberList::new_pre_sorted([5, 10, 15]),
3949 )
3950 .unwrap();
3951 batch
3952 .put::<tables::AccountsHistory>(
3953 ShardedKey::new(addr3, u64::MAX),
3954 &BlockNumberList::new_pre_sorted([100, 200]),
3955 )
3956 .unwrap();
3957 batch.commit().unwrap();
3958
3959 let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
3961 targets.sort_by_key(|(addr, _)| *addr);
3962
3963 let mut batch = provider.batch();
3964 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
3965 batch.commit().unwrap();
3966
3967 assert_eq!(outcomes.updated, 2);
3971 assert_eq!(outcomes.unchanged, 1);
3972
3973 let shards1 = provider.account_history_shards(addr1).unwrap();
3974 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
3975
3976 let shards2 = provider.account_history_shards(addr2).unwrap();
3977 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
3978
3979 let shards3 = provider.account_history_shards(addr3).unwrap();
3980 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
3981 }
3982
3983 #[test]
3984 fn test_prune_account_history_batch_target_with_no_shards() {
3985 let temp_dir = TempDir::new().unwrap();
3986 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3987
3988 let addr1 = Address::from([0x01; 20]);
3989 let addr2 = Address::from([0x02; 20]); let addr3 = Address::from([0x03; 20]);
3991
3992 let mut batch = provider.batch();
3994 batch
3995 .put::<tables::AccountsHistory>(
3996 ShardedKey::new(addr1, u64::MAX),
3997 &BlockNumberList::new_pre_sorted([10, 20]),
3998 )
3999 .unwrap();
4000 batch
4001 .put::<tables::AccountsHistory>(
4002 ShardedKey::new(addr3, u64::MAX),
4003 &BlockNumberList::new_pre_sorted([30, 40]),
4004 )
4005 .unwrap();
4006 batch.commit().unwrap();
4007
4008 let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4010 targets.sort_by_key(|(addr, _)| *addr);
4011
4012 let mut batch = provider.batch();
4013 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4014 batch.commit().unwrap();
4015
4016 assert_eq!(outcomes.updated, 2);
4020 assert_eq!(outcomes.unchanged, 1);
4021
4022 let shards1 = provider.account_history_shards(addr1).unwrap();
4023 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4024
4025 let shards3 = provider.account_history_shards(addr3).unwrap();
4026 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4027 }
4028
4029 #[test]
4030 fn test_prune_storage_history_batch_multiple_sorted_targets() {
4031 let temp_dir = TempDir::new().unwrap();
4032 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4033
4034 let addr = Address::from([0x42; 20]);
4035 let slot1 = B256::from([0x01; 32]);
4036 let slot2 = B256::from([0x02; 32]);
4037
4038 let mut batch = provider.batch();
4040 batch
4041 .put::<tables::StoragesHistory>(
4042 StorageShardedKey::new(addr, slot1, u64::MAX),
4043 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4044 )
4045 .unwrap();
4046 batch
4047 .put::<tables::StoragesHistory>(
4048 StorageShardedKey::new(addr, slot2, u64::MAX),
4049 &BlockNumberList::new_pre_sorted([5, 15, 25]),
4050 )
4051 .unwrap();
4052 batch.commit().unwrap();
4053
4054 let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4056 targets.sort_by_key(|((a, s), _)| (*a, *s));
4057
4058 let mut batch = provider.batch();
4059 let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4060 batch.commit().unwrap();
4061
4062 assert_eq!(outcomes.updated, 2);
4063
4064 let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4065 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4066
4067 let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4068 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4069 }
4070}