1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5 keccak256,
6 map::{AddressMap, HashMap},
7 Address, BlockNumber, TxNumber, B256,
8};
9use itertools::Itertools;
10use metrics::Label;
11use parking_lot::Mutex;
12use reth_chain_state::ExecutedBlock;
13use reth_db_api::{
14 database_metrics::DatabaseMetrics,
15 models::{
16 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
17 StorageSettings,
18 },
19 table::{Compress, Decode, Decompress, Encode, Table},
20 tables, BlockNumberList, DatabaseError,
21};
22use reth_primitives_traits::BlockBody as _;
23use reth_prune_types::PruneMode;
24use reth_storage_errors::{
25 db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
26 provider::{ProviderError, ProviderResult},
27};
28use rocksdb::{
29 BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
30 DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
31 OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
32 DB,
33};
34use std::{
35 collections::BTreeMap,
36 fmt,
37 path::{Path, PathBuf},
38 sync::Arc,
39 time::Instant,
40};
41use tracing::instrument;
42
43pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
45
46type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
48
49#[derive(Debug, Clone)]
51pub struct RocksDBTableStats {
52 pub sst_size_bytes: u64,
54 pub memtable_size_bytes: u64,
56 pub name: String,
58 pub estimated_num_keys: u64,
60 pub estimated_size_bytes: u64,
62 pub pending_compaction_bytes: u64,
64}
65
66#[derive(Debug, Clone)]
70pub struct RocksDBStats {
71 pub tables: Vec<RocksDBTableStats>,
73 pub wal_size_bytes: u64,
77}
78
79#[derive(Clone)]
81pub(crate) struct RocksDBWriteCtx {
82 pub first_block_number: BlockNumber,
84 pub prune_tx_lookup: Option<PruneMode>,
86 pub storage_settings: StorageSettings,
88 pub pending_batches: PendingRocksDBBatches,
90}
91
92impl fmt::Debug for RocksDBWriteCtx {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.debug_struct("RocksDBWriteCtx")
95 .field("first_block_number", &self.first_block_number)
96 .field("prune_tx_lookup", &self.prune_tx_lookup)
97 .field("storage_settings", &self.storage_settings)
98 .field("pending_batches", &"<pending batches>")
99 .finish()
100 }
101}
102
103const DEFAULT_CACHE_SIZE: usize = 128 << 20;
105
106const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
108
109const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
111
112const DEFAULT_MAX_OPEN_FILES: i32 = 512;
120
121const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
123
124const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
130
131const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
135
136const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
142
143pub struct RocksDBBuilder {
145 path: PathBuf,
146 column_families: Vec<String>,
147 enable_metrics: bool,
148 enable_statistics: bool,
149 log_level: rocksdb::LogLevel,
150 block_cache: Cache,
151 read_only: bool,
152}
153
154impl fmt::Debug for RocksDBBuilder {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 f.debug_struct("RocksDBBuilder")
157 .field("path", &self.path)
158 .field("column_families", &self.column_families)
159 .field("enable_metrics", &self.enable_metrics)
160 .finish()
161 }
162}
163
164impl RocksDBBuilder {
165 pub fn new(path: impl AsRef<Path>) -> Self {
167 let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
168 Self {
169 path: path.as_ref().to_path_buf(),
170 column_families: Vec::new(),
171 enable_metrics: false,
172 enable_statistics: false,
173 log_level: rocksdb::LogLevel::Info,
174 block_cache: cache,
175 read_only: false,
176 }
177 }
178
179 fn default_table_options(cache: &Cache) -> BlockBasedOptions {
181 let mut table_options = BlockBasedOptions::default();
182 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
183 table_options.set_cache_index_and_filter_blocks(true);
184 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
185 table_options.set_block_cache(cache);
187 table_options
188 }
189
190 fn default_options(
192 log_level: rocksdb::LogLevel,
193 cache: &Cache,
194 enable_statistics: bool,
195 ) -> Options {
196 let table_options = Self::default_table_options(cache);
198
199 let mut options = Options::default();
200 options.set_block_based_table_factory(&table_options);
201 options.create_if_missing(true);
202 options.create_missing_column_families(true);
203 options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
204 options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
205
206 options.set_bottommost_compression_type(DBCompressionType::Zstd);
207 options.set_bottommost_zstd_max_train_bytes(0, true);
208 options.set_compression_type(DBCompressionType::Lz4);
209 options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
210
211 options.set_log_level(log_level);
212
213 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
214
215 options.set_wal_ttl_seconds(0);
218 options.set_wal_size_limit_mb(0);
219
220 if enable_statistics {
222 options.enable_statistics();
223 }
224
225 options
226 }
227
228 fn default_column_family_options(cache: &Cache) -> Options {
230 let table_options = Self::default_table_options(cache);
232
233 let mut cf_options = Options::default();
234 cf_options.set_block_based_table_factory(&table_options);
235 cf_options.set_level_compaction_dynamic_level_bytes(true);
236 cf_options.set_compression_type(DBCompressionType::Lz4);
238 cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
239 cf_options.set_bottommost_zstd_max_train_bytes(0, true);
241 cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
242
243 cf_options
244 }
245
246 fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
253 let mut table_options = BlockBasedOptions::default();
254 table_options.set_block_size(DEFAULT_BLOCK_SIZE);
255 table_options.set_cache_index_and_filter_blocks(true);
256 table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
257 table_options.set_block_cache(cache);
258 let mut cf_options = Options::default();
262 cf_options.set_block_based_table_factory(&table_options);
263 cf_options.set_level_compaction_dynamic_level_bytes(true);
264 cf_options.set_compression_type(DBCompressionType::None);
267 cf_options.set_bottommost_compression_type(DBCompressionType::None);
268
269 cf_options
270 }
271
272 pub fn with_table<T: Table>(mut self) -> Self {
274 self.column_families.push(T::NAME.to_string());
275 self
276 }
277
278 pub fn with_default_tables(self) -> Self {
285 self.with_table::<tables::TransactionHashNumbers>()
286 .with_table::<tables::AccountsHistory>()
287 .with_table::<tables::StoragesHistory>()
288 }
289
290 pub const fn with_metrics(mut self) -> Self {
292 self.enable_metrics = true;
293 self
294 }
295
296 pub const fn with_statistics(mut self) -> Self {
298 self.enable_statistics = true;
299 self
300 }
301
302 pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
304 if let Some(level) = log_level {
305 self.log_level = convert_log_level(level);
306 }
307 self
308 }
309
310 pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
312 self.block_cache = Cache::new_lru_cache(capacity_bytes);
313 self
314 }
315
316 pub const fn with_read_only(mut self, read_only: bool) -> Self {
320 self.read_only = read_only;
321 self
322 }
323
324 pub fn build(self) -> ProviderResult<RocksDBProvider> {
326 let options =
327 Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
328
329 let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
330 .column_families
331 .iter()
332 .map(|name| {
333 let cf_options = if name == tables::TransactionHashNumbers::NAME {
334 Self::tx_hash_numbers_column_family_options(&self.block_cache)
335 } else {
336 Self::default_column_family_options(&self.block_cache)
337 };
338 ColumnFamilyDescriptor::new(name.clone(), cf_options)
339 })
340 .collect();
341
342 let metrics = self.enable_metrics.then(RocksDBMetrics::default);
343
344 if self.read_only {
345 let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
346 .map_err(|e| {
347 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
348 message: e.to_string().into(),
349 code: -1,
350 }))
351 })?;
352 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
353 } else {
354 let db =
359 OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
360 .map_err(|e| {
361 ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
362 message: e.to_string().into(),
363 code: -1,
364 }))
365 })?;
366 Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
367 }
368 }
369}
370
371macro_rules! compress_to_buf_or_ref {
374 ($buf:expr, $value:expr) => {
375 if let Some(value) = $value.uncompressable_ref() {
376 Some(value)
377 } else {
378 $buf.clear();
379 $value.compress_to_buf(&mut $buf);
380 None
381 }
382 };
383}
384
385#[derive(Debug)]
387pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
388
389enum RocksDBProviderInner {
391 ReadWrite {
393 db: OptimisticTransactionDB,
395 metrics: Option<RocksDBMetrics>,
397 },
398 ReadOnly {
401 db: DB,
403 metrics: Option<RocksDBMetrics>,
405 },
406}
407
408impl RocksDBProviderInner {
409 const fn metrics(&self) -> Option<&RocksDBMetrics> {
411 match self {
412 Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
413 }
414 }
415
416 fn db_rw(&self) -> &OptimisticTransactionDB {
418 match self {
419 Self::ReadWrite { db, .. } => db,
420 Self::ReadOnly { .. } => {
421 panic!("Cannot perform write operation on read-only RocksDB provider")
422 }
423 }
424 }
425
426 fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
428 let cf = match self {
429 Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
430 Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
431 };
432 cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
433 }
434
435 fn cf_handle_rw(&self, name: &str) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
440 self.db_rw()
441 .cf_handle(name)
442 .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", name)))
443 }
444
445 fn get_cf(
447 &self,
448 cf: &rocksdb::ColumnFamily,
449 key: impl AsRef<[u8]>,
450 ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
451 match self {
452 Self::ReadWrite { db, .. } => db.get_cf(cf, key),
453 Self::ReadOnly { db, .. } => db.get_cf(cf, key),
454 }
455 }
456
457 fn put_cf(
459 &self,
460 cf: &rocksdb::ColumnFamily,
461 key: impl AsRef<[u8]>,
462 value: impl AsRef<[u8]>,
463 ) -> Result<(), rocksdb::Error> {
464 self.db_rw().put_cf(cf, key, value)
465 }
466
467 fn delete_cf(
469 &self,
470 cf: &rocksdb::ColumnFamily,
471 key: impl AsRef<[u8]>,
472 ) -> Result<(), rocksdb::Error> {
473 self.db_rw().delete_cf(cf, key)
474 }
475
476 fn delete_range_cf<K: AsRef<[u8]>>(
478 &self,
479 cf: &rocksdb::ColumnFamily,
480 from: K,
481 to: K,
482 ) -> Result<(), rocksdb::Error> {
483 self.db_rw().delete_range_cf(cf, from, to)
484 }
485
486 fn iterator_cf(
488 &self,
489 cf: &rocksdb::ColumnFamily,
490 mode: IteratorMode<'_>,
491 ) -> RocksDBIterEnum<'_> {
492 match self {
493 Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
494 Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
495 }
496 }
497
498 fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
503 match self {
504 Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
505 Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
506 }
507 }
508
509 fn path(&self) -> &Path {
511 match self {
512 Self::ReadWrite { db, .. } => db.path(),
513 Self::ReadOnly { db, .. } => db.path(),
514 }
515 }
516
517 fn wal_size_bytes(&self) -> u64 {
521 let path = self.path();
522
523 match std::fs::read_dir(path) {
524 Ok(entries) => entries
525 .filter_map(|e| e.ok())
526 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
527 .filter_map(|e| e.metadata().ok())
528 .map(|m| m.len())
529 .sum(),
530 Err(_) => 0,
531 }
532 }
533
534 fn table_stats(&self) -> Vec<RocksDBTableStats> {
536 let mut stats = Vec::new();
537
538 macro_rules! collect_stats {
539 ($db:expr) => {
540 for cf_name in ROCKSDB_TABLES {
541 if let Some(cf) = $db.cf_handle(cf_name) {
542 let estimated_num_keys = $db
543 .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
544 .ok()
545 .flatten()
546 .unwrap_or(0);
547
548 let sst_size = $db
550 .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
551 .ok()
552 .flatten()
553 .unwrap_or(0);
554
555 let memtable_size = $db
556 .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
557 .ok()
558 .flatten()
559 .unwrap_or(0);
560
561 let estimated_size_bytes = sst_size + memtable_size;
562
563 let pending_compaction_bytes = $db
564 .property_int_value_cf(
565 cf,
566 rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
567 )
568 .ok()
569 .flatten()
570 .unwrap_or(0);
571
572 stats.push(RocksDBTableStats {
573 sst_size_bytes: sst_size,
574 memtable_size_bytes: memtable_size,
575 name: cf_name.to_string(),
576 estimated_num_keys,
577 estimated_size_bytes,
578 pending_compaction_bytes,
579 });
580 }
581 }
582 };
583 }
584
585 match self {
586 Self::ReadWrite { db, .. } => collect_stats!(db),
587 Self::ReadOnly { db, .. } => collect_stats!(db),
588 }
589
590 stats
591 }
592
593 fn db_stats(&self) -> RocksDBStats {
595 RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
596 }
597}
598
599impl fmt::Debug for RocksDBProviderInner {
600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601 match self {
602 Self::ReadWrite { metrics, .. } => f
603 .debug_struct("RocksDBProviderInner::ReadWrite")
604 .field("db", &"<OptimisticTransactionDB>")
605 .field("metrics", metrics)
606 .finish(),
607 Self::ReadOnly { metrics, .. } => f
608 .debug_struct("RocksDBProviderInner::ReadOnly")
609 .field("db", &"<DB (read-only)>")
610 .field("metrics", metrics)
611 .finish(),
612 }
613 }
614}
615
616impl Drop for RocksDBProviderInner {
617 fn drop(&mut self) {
618 match self {
619 Self::ReadWrite { db, .. } => {
620 if let Err(e) = db.flush_wal(true) {
623 tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
624 }
625 for cf_name in ROCKSDB_TABLES {
626 if let Some(cf) = db.cf_handle(cf_name) &&
627 let Err(e) = db.flush_cf(&cf)
628 {
629 tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
630 }
631 }
632 db.cancel_all_background_work(true);
633 }
634 Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
635 }
636 }
637}
638
639impl Clone for RocksDBProvider {
640 fn clone(&self) -> Self {
641 Self(self.0.clone())
642 }
643}
644
645impl DatabaseMetrics for RocksDBProvider {
646 fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
647 let mut metrics = Vec::new();
648
649 for stat in self.table_stats() {
650 metrics.push((
651 "rocksdb.table_size",
652 stat.estimated_size_bytes as f64,
653 vec![Label::new("table", stat.name.clone())],
654 ));
655 metrics.push((
656 "rocksdb.table_entries",
657 stat.estimated_num_keys as f64,
658 vec![Label::new("table", stat.name.clone())],
659 ));
660 metrics.push((
661 "rocksdb.pending_compaction_bytes",
662 stat.pending_compaction_bytes as f64,
663 vec![Label::new("table", stat.name.clone())],
664 ));
665 metrics.push((
666 "rocksdb.sst_size",
667 stat.sst_size_bytes as f64,
668 vec![Label::new("table", stat.name.clone())],
669 ));
670 metrics.push((
671 "rocksdb.memtable_size",
672 stat.memtable_size_bytes as f64,
673 vec![Label::new("table", stat.name)],
674 ));
675 }
676
677 metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
679
680 metrics
681 }
682}
683
684impl RocksDBProvider {
685 pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
687 RocksDBBuilder::new(path).build()
688 }
689
690 pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
692 RocksDBBuilder::new(path)
693 }
694
695 pub fn is_read_only(&self) -> bool {
697 matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
698 }
699
700 pub fn tx(&self) -> RocksTx<'_> {
708 let write_options = WriteOptions::default();
709 let txn_options = OptimisticTransactionOptions::default();
710 let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
711 RocksTx { inner, provider: self }
712 }
713
714 pub fn batch(&self) -> RocksDBBatch<'_> {
722 RocksDBBatch {
723 provider: self,
724 inner: WriteBatchWithTransaction::<true>::default(),
725 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
726 auto_commit_threshold: None,
727 }
728 }
729
730 pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
736 RocksDBBatch {
737 provider: self,
738 inner: WriteBatchWithTransaction::<true>::default(),
739 buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
740 auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
741 }
742 }
743
744 fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
746 self.0.cf_handle::<T>()
747 }
748
749 fn execute_with_operation_metric<R>(
751 &self,
752 operation: RocksDBOperation,
753 table: &'static str,
754 f: impl FnOnce(&Self) -> R,
755 ) -> R {
756 let start = self.0.metrics().map(|_| Instant::now());
757 let res = f(self);
758
759 if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
760 metrics.record_operation(operation, table, start.elapsed());
761 }
762
763 res
764 }
765
766 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
768 self.get_encoded::<T>(&key.encode())
769 }
770
771 pub fn get_encoded<T: Table>(
773 &self,
774 key: &<T::Key as Encode>::Encoded,
775 ) -> ProviderResult<Option<T::Value>> {
776 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
777 let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
778 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
779 message: e.to_string().into(),
780 code: -1,
781 }))
782 })?;
783
784 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
785 })
786 }
787
788 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
793 let encoded_key = key.encode();
794 self.put_encoded::<T>(&encoded_key, value)
795 }
796
797 pub fn put_encoded<T: Table>(
802 &self,
803 key: &<T::Key as Encode>::Encoded,
804 value: &T::Value,
805 ) -> ProviderResult<()> {
806 self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
807 let mut buf = Vec::new();
811 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
812
813 this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
814 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
815 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
816 operation: DatabaseWriteOperation::PutUpsert,
817 table_name: T::NAME,
818 key: key.as_ref().to_vec(),
819 })))
820 })
821 })
822 }
823
824 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
829 self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
830 this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
831 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
832 message: e.to_string().into(),
833 code: -1,
834 }))
835 })
836 })
837 }
838
839 pub fn clear<T: Table>(&self) -> ProviderResult<()> {
845 let cf = self.get_cf_handle::<T>()?;
846
847 self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
848 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
849 message: e.to_string().into(),
850 code: -1,
851 }))
852 })?;
853
854 Ok(())
855 }
856
857 fn get_boundary<T: Table>(
859 &self,
860 mode: IteratorMode<'_>,
861 ) -> ProviderResult<Option<(T::Key, T::Value)>> {
862 self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
863 let cf = this.get_cf_handle::<T>()?;
864 let mut iter = this.0.iterator_cf(cf, mode);
865
866 match iter.next() {
867 Some(Ok((key_bytes, value_bytes))) => {
868 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
869 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
870 let value = T::Value::decompress(&value_bytes)
871 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
872 Ok(Some((key, value)))
873 }
874 Some(Err(e)) => {
875 Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
876 message: e.to_string().into(),
877 code: -1,
878 })))
879 }
880 None => Ok(None),
881 }
882 })
883 }
884
885 #[inline]
887 pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
888 self.get_boundary::<T>(IteratorMode::Start)
889 }
890
891 #[inline]
893 pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
894 self.get_boundary::<T>(IteratorMode::End)
895 }
896
897 pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
901 let cf = self.get_cf_handle::<T>()?;
902 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
903 Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
904 }
905
906 pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
910 self.0.table_stats()
911 }
912
913 pub fn wal_size_bytes(&self) -> u64 {
919 self.0.wal_size_bytes()
920 }
921
922 pub fn db_stats(&self) -> RocksDBStats {
926 self.0.db_stats()
927 }
928
929 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
940 pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
941 let db = self.0.db_rw();
942
943 for cf_name in tables {
944 if let Some(cf) = db.cf_handle(cf_name) {
945 db.flush_cf(&cf).map_err(|e| {
946 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
947 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
948 operation: DatabaseWriteOperation::Flush,
949 table_name: cf_name,
950 key: Vec::new(),
951 })))
952 })?;
953 }
954 }
955
956 db.flush_wal(true).map_err(|e| {
957 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
958 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
959 operation: DatabaseWriteOperation::Flush,
960 table_name: "WAL",
961 key: Vec::new(),
962 })))
963 })?;
964
965 Ok(())
966 }
967
968 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
980 pub fn flush_and_compact(&self) -> ProviderResult<()> {
981 self.flush(ROCKSDB_TABLES)?;
982
983 let db = self.0.db_rw();
984
985 for cf_name in ROCKSDB_TABLES {
986 if let Some(cf) = db.cf_handle(cf_name) {
987 db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
988 }
989 }
990
991 Ok(())
992 }
993
994 pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
998 let cf = self.get_cf_handle::<T>()?;
999 let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1000 Ok(RocksDBRawIter { inner: iter })
1001 }
1002
1003 pub fn account_history_shards(
1008 &self,
1009 address: Address,
1010 ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1011 let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1013
1014 let start_key = ShardedKey::new(address, 0u64);
1017 let start_bytes = start_key.encode();
1018
1019 let iter = self
1021 .0
1022 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1023
1024 let mut result = Vec::new();
1025 for item in iter {
1026 match item {
1027 Ok((key_bytes, value_bytes)) => {
1028 let key = ShardedKey::<Address>::decode(&key_bytes)
1030 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1031
1032 if key.key != address {
1034 break;
1035 }
1036
1037 let value = BlockNumberList::decompress(&value_bytes)
1039 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1040
1041 result.push((key, value));
1042 }
1043 Err(e) => {
1044 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1045 message: e.to_string().into(),
1046 code: -1,
1047 })));
1048 }
1049 }
1050 }
1051
1052 Ok(result)
1053 }
1054
1055 pub fn storage_history_shards(
1060 &self,
1061 address: Address,
1062 storage_key: B256,
1063 ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1064 let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1065
1066 let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1067 let start_bytes = start_key.encode();
1068
1069 let iter = self
1070 .0
1071 .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1072
1073 let mut result = Vec::new();
1074 for item in iter {
1075 match item {
1076 Ok((key_bytes, value_bytes)) => {
1077 let key = StorageShardedKey::decode(&key_bytes)
1078 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1079
1080 if key.address != address || key.sharded_key.key != storage_key {
1081 break;
1082 }
1083
1084 let value = BlockNumberList::decompress(&value_bytes)
1085 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1086
1087 result.push((key, value));
1088 }
1089 Err(e) => {
1090 return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1091 message: e.to_string().into(),
1092 code: -1,
1093 })));
1094 }
1095 }
1096 }
1097
1098 Ok(result)
1099 }
1100
1101 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1109 pub fn unwind_account_history_indices(
1110 &self,
1111 last_indices: &[(Address, BlockNumber)],
1112 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1113 let mut address_min_block: AddressMap<BlockNumber> =
1114 AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1115 for &(address, block_number) in last_indices {
1116 address_min_block
1117 .entry(address)
1118 .and_modify(|min| *min = (*min).min(block_number))
1119 .or_insert(block_number);
1120 }
1121
1122 let mut batch = self.batch();
1123 for (address, min_block) in address_min_block {
1124 match min_block.checked_sub(1) {
1125 Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1126 None => batch.clear_account_history(address)?,
1127 }
1128 }
1129
1130 Ok(batch.into_inner())
1131 }
1132
1133 pub fn unwind_storage_history_indices(
1141 &self,
1142 storage_changesets: &[(Address, B256, BlockNumber)],
1143 ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1144 let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1145 HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1146 for &(address, storage_key, block_number) in storage_changesets {
1147 key_min_block
1148 .entry((address, storage_key))
1149 .and_modify(|min| *min = (*min).min(block_number))
1150 .or_insert(block_number);
1151 }
1152
1153 let mut batch = self.batch();
1154 for ((address, storage_key), min_block) in key_min_block {
1155 match min_block.checked_sub(1) {
1156 Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1157 None => batch.clear_storage_history(address, storage_key)?,
1158 }
1159 }
1160
1161 Ok(batch.into_inner())
1162 }
1163
1164 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1166 pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1167 where
1168 F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1169 {
1170 self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1171 let mut batch_handle = this.batch();
1172 f(&mut batch_handle)?;
1173 batch_handle.commit()
1174 })
1175 }
1176
1177 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1185 pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1186 self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1187 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1188 message: e.to_string().into(),
1189 code: -1,
1190 }))
1191 })
1192 }
1193
1194 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1200 pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1201 &self,
1202 blocks: &[ExecutedBlock<N>],
1203 tx_nums: &[TxNumber],
1204 ctx: RocksDBWriteCtx,
1205 runtime: &reth_tasks::Runtime,
1206 ) -> ProviderResult<()> {
1207 if !ctx.storage_settings.storage_v2 {
1208 return Ok(());
1209 }
1210
1211 let mut r_tx_hash = None;
1212 let mut r_account_history = None;
1213 let mut r_storage_history = None;
1214
1215 let write_tx_hash =
1216 ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1217 let write_account_history = ctx.storage_settings.storage_v2;
1218 let write_storage_history = ctx.storage_settings.storage_v2;
1219
1220 runtime.storage_pool().in_place_scope(|s| {
1221 if write_tx_hash {
1222 s.spawn(|_| {
1223 r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1224 });
1225 }
1226
1227 if write_account_history {
1228 s.spawn(|_| {
1229 r_account_history = Some(self.write_account_history(blocks, &ctx));
1230 });
1231 }
1232
1233 if write_storage_history {
1234 s.spawn(|_| {
1235 r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1236 });
1237 }
1238 });
1239
1240 if write_tx_hash {
1241 r_tx_hash.ok_or_else(|| {
1242 ProviderError::Database(DatabaseError::Other(
1243 "rocksdb tx-hash write thread panicked".into(),
1244 ))
1245 })??;
1246 }
1247 if write_account_history {
1248 r_account_history.ok_or_else(|| {
1249 ProviderError::Database(DatabaseError::Other(
1250 "rocksdb account-history write thread panicked".into(),
1251 ))
1252 })??;
1253 }
1254 if write_storage_history {
1255 r_storage_history.ok_or_else(|| {
1256 ProviderError::Database(DatabaseError::Other(
1257 "rocksdb storage-history write thread panicked".into(),
1258 ))
1259 })??;
1260 }
1261
1262 Ok(())
1263 }
1264
1265 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1267 fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1268 &self,
1269 blocks: &[ExecutedBlock<N>],
1270 tx_nums: &[TxNumber],
1271 ctx: &RocksDBWriteCtx,
1272 ) -> ProviderResult<()> {
1273 let mut batch = self.batch();
1274 for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1275 let body = block.recovered_block().body();
1276 let mut tx_num = first_tx_num;
1277 for transaction in body.transactions_iter() {
1278 batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1279 tx_num += 1;
1280 }
1281 }
1282 ctx.pending_batches.lock().push(batch.into_inner());
1283 Ok(())
1284 }
1285
1286 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1290 fn write_account_history<N: reth_node_types::NodePrimitives>(
1291 &self,
1292 blocks: &[ExecutedBlock<N>],
1293 ctx: &RocksDBWriteCtx,
1294 ) -> ProviderResult<()> {
1295 let mut batch = self.batch();
1296 let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1297
1298 for (block_idx, block) in blocks.iter().enumerate() {
1299 let block_number = ctx.first_block_number + block_idx as u64;
1300 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1301
1302 for account_block_reverts in reverts.accounts {
1305 for (address, _) in account_block_reverts {
1306 account_history.entry(address).or_default().push(block_number);
1307 }
1308 }
1309 }
1310
1311 for (address, indices) in account_history {
1313 batch.append_account_history_shard(address, indices)?;
1314 }
1315 ctx.pending_batches.lock().push(batch.into_inner());
1316 Ok(())
1317 }
1318
1319 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1323 fn write_storage_history<N: reth_node_types::NodePrimitives>(
1324 &self,
1325 blocks: &[ExecutedBlock<N>],
1326 ctx: &RocksDBWriteCtx,
1327 ) -> ProviderResult<()> {
1328 let mut batch = self.batch();
1329 let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1330
1331 for (block_idx, block) in blocks.iter().enumerate() {
1332 let block_number = ctx.first_block_number + block_idx as u64;
1333 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1334
1335 for storage_block_reverts in reverts.storage {
1338 for revert in storage_block_reverts {
1339 for (slot, _) in revert.storage_revert {
1340 let plain_key = B256::new(slot.to_be_bytes());
1341 let key = keccak256(plain_key);
1342 storage_history
1343 .entry((revert.address, key))
1344 .or_default()
1345 .push(block_number);
1346 }
1347 }
1348 }
1349 }
1350
1351 for ((address, slot), indices) in storage_history {
1353 batch.append_storage_history_shard(address, slot, indices)?;
1354 }
1355 ctx.pending_batches.lock().push(batch.into_inner());
1356 Ok(())
1357 }
1358}
1359
1360#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1362pub enum PruneShardOutcome {
1363 Deleted,
1365 Updated,
1367 Unchanged,
1369}
1370
1371#[derive(Debug, Default, Clone, Copy)]
1373pub struct PrunedIndices {
1374 pub deleted: usize,
1376 pub updated: usize,
1378 pub unchanged: usize,
1380}
1381
1382#[must_use = "batch must be committed"]
1392pub struct RocksDBBatch<'a> {
1393 provider: &'a RocksDBProvider,
1394 inner: WriteBatchWithTransaction<true>,
1395 buf: Vec<u8>,
1396 auto_commit_threshold: Option<usize>,
1398}
1399
1400impl fmt::Debug for RocksDBBatch<'_> {
1401 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1402 f.debug_struct("RocksDBBatch")
1403 .field("provider", &self.provider)
1404 .field("batch", &"<WriteBatchWithTransaction>")
1405 .field("length", &self.inner.len())
1407 .field("size_in_bytes", &self.inner.size_in_bytes())
1410 .finish()
1411 }
1412}
1413
1414impl<'a> RocksDBBatch<'a> {
1415 pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1419 let encoded_key = key.encode();
1420 self.put_encoded::<T>(&encoded_key, value)
1421 }
1422
1423 pub fn put_encoded<T: Table>(
1427 &mut self,
1428 key: &<T::Key as Encode>::Encoded,
1429 value: &T::Value,
1430 ) -> ProviderResult<()> {
1431 let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1432 self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1433 self.maybe_auto_commit()?;
1434 Ok(())
1435 }
1436
1437 pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1441 self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1442 self.maybe_auto_commit()?;
1443 Ok(())
1444 }
1445
1446 fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1451 if let Some(threshold) = self.auto_commit_threshold &&
1452 self.inner.size_in_bytes() >= threshold
1453 {
1454 tracing::debug!(
1455 target: "providers::rocksdb",
1456 batch_size = self.inner.size_in_bytes(),
1457 threshold,
1458 "Auto-committing RocksDB batch"
1459 );
1460 let old_batch = std::mem::take(&mut self.inner);
1461 self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1462 |e| {
1463 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1464 message: e.to_string().into(),
1465 code: -1,
1466 }))
1467 },
1468 )?;
1469 }
1470 Ok(())
1471 }
1472
1473 #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1480 pub fn commit(self) -> ProviderResult<()> {
1481 self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1482 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1483 message: e.to_string().into(),
1484 code: -1,
1485 }))
1486 })
1487 }
1488
1489 pub fn len(&self) -> usize {
1491 self.inner.len()
1492 }
1493
1494 pub fn is_empty(&self) -> bool {
1496 self.inner.is_empty()
1497 }
1498
1499 pub fn size_in_bytes(&self) -> usize {
1501 self.inner.size_in_bytes()
1502 }
1503
1504 pub const fn provider(&self) -> &RocksDBProvider {
1506 self.provider
1507 }
1508
1509 pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1513 self.inner
1514 }
1515
1516 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1521 self.provider.get::<T>(key)
1522 }
1523
1524 pub fn append_account_history_shard(
1536 &mut self,
1537 address: Address,
1538 indices: impl IntoIterator<Item = u64>,
1539 ) -> ProviderResult<()> {
1540 let indices: Vec<u64> = indices.into_iter().collect();
1541
1542 if indices.is_empty() {
1543 return Ok(());
1544 }
1545
1546 debug_assert!(
1547 indices.windows(2).all(|w| w[0] < w[1]),
1548 "indices must be strictly increasing: {:?}",
1549 indices
1550 );
1551
1552 let last_key = ShardedKey::new(address, u64::MAX);
1553 let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1554 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1555
1556 last_shard.append(indices).map_err(ProviderError::other)?;
1557
1558 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1560 self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1561 return Ok(());
1562 }
1563
1564 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1566 let mut chunks_peekable = chunks.into_iter().peekable();
1567
1568 while let Some(chunk) = chunks_peekable.next() {
1569 let shard = BlockNumberList::new_pre_sorted(chunk);
1570 let highest_block_number = if chunks_peekable.peek().is_some() {
1571 shard.iter().next_back().expect("`chunks` does not return empty list")
1572 } else {
1573 u64::MAX
1574 };
1575
1576 self.put::<tables::AccountsHistory>(
1577 ShardedKey::new(address, highest_block_number),
1578 &shard,
1579 )?;
1580 }
1581
1582 Ok(())
1583 }
1584
1585 pub fn append_storage_history_shard(
1597 &mut self,
1598 address: Address,
1599 storage_key: B256,
1600 indices: impl IntoIterator<Item = u64>,
1601 ) -> ProviderResult<()> {
1602 let indices: Vec<u64> = indices.into_iter().collect();
1603
1604 if indices.is_empty() {
1605 return Ok(());
1606 }
1607
1608 debug_assert!(
1609 indices.windows(2).all(|w| w[0] < w[1]),
1610 "indices must be strictly increasing: {:?}",
1611 indices
1612 );
1613
1614 let last_key = StorageShardedKey::last(address, storage_key);
1615 let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1616 let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1617
1618 last_shard.append(indices).map_err(ProviderError::other)?;
1619
1620 if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1622 self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1623 return Ok(());
1624 }
1625
1626 let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1628 let mut chunks_peekable = chunks.into_iter().peekable();
1629
1630 while let Some(chunk) = chunks_peekable.next() {
1631 let shard = BlockNumberList::new_pre_sorted(chunk);
1632 let highest_block_number = if chunks_peekable.peek().is_some() {
1633 shard.iter().next_back().expect("`chunks` does not return empty list")
1634 } else {
1635 u64::MAX
1636 };
1637
1638 self.put::<tables::StoragesHistory>(
1639 StorageShardedKey::new(address, storage_key, highest_block_number),
1640 &shard,
1641 )?;
1642 }
1643
1644 Ok(())
1645 }
1646
1647 pub fn unwind_account_history_to(
1654 &mut self,
1655 address: Address,
1656 keep_to: BlockNumber,
1657 ) -> ProviderResult<()> {
1658 let shards = self.provider.account_history_shards(address)?;
1659 if shards.is_empty() {
1660 return Ok(());
1661 }
1662
1663 let boundary_idx = shards.iter().position(|(key, _)| {
1666 key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1667 });
1668
1669 let Some(boundary_idx) = boundary_idx else {
1671 let (last_key, last_value) = shards.last().expect("shards is non-empty");
1672 if last_key.highest_block_number != u64::MAX {
1673 self.delete::<tables::AccountsHistory>(last_key.clone())?;
1674 self.put::<tables::AccountsHistory>(
1675 ShardedKey::new(address, u64::MAX),
1676 last_value,
1677 )?;
1678 }
1679 return Ok(());
1680 };
1681
1682 for (key, _) in shards.iter().skip(boundary_idx + 1) {
1684 self.delete::<tables::AccountsHistory>(key.clone())?;
1685 }
1686
1687 let (boundary_key, boundary_list) = &shards[boundary_idx];
1689
1690 self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1692
1693 let new_last =
1695 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1696
1697 if new_last.is_empty() {
1698 if boundary_idx == 0 {
1701 return Ok(());
1703 }
1704
1705 let (prev_key, prev_value) = &shards[boundary_idx - 1];
1706 if prev_key.highest_block_number != u64::MAX {
1707 self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1708 self.put::<tables::AccountsHistory>(
1709 ShardedKey::new(address, u64::MAX),
1710 prev_value,
1711 )?;
1712 }
1713 return Ok(());
1714 }
1715
1716 self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1717
1718 Ok(())
1719 }
1720
1721 #[allow(clippy::too_many_arguments)]
1727 fn prune_history_shards_inner<K>(
1728 &mut self,
1729 shards: Vec<(K, BlockNumberList)>,
1730 to_block: BlockNumber,
1731 get_highest: impl Fn(&K) -> u64,
1732 is_sentinel: impl Fn(&K) -> bool,
1733 delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
1734 put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
1735 create_sentinel: impl Fn() -> K,
1736 ) -> ProviderResult<PruneShardOutcome>
1737 where
1738 K: Clone,
1739 {
1740 if shards.is_empty() {
1741 return Ok(PruneShardOutcome::Unchanged);
1742 }
1743
1744 let mut deleted = false;
1745 let mut updated = false;
1746 let mut last_remaining: Option<(K, BlockNumberList)> = None;
1747
1748 for (key, block_list) in shards {
1749 if !is_sentinel(&key) && get_highest(&key) <= to_block {
1750 delete_shard(self, key)?;
1751 deleted = true;
1752 } else {
1753 let original_len = block_list.len();
1754 let filtered =
1755 BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
1756
1757 if filtered.is_empty() {
1758 delete_shard(self, key)?;
1759 deleted = true;
1760 } else if filtered.len() < original_len {
1761 put_shard(self, key.clone(), &filtered)?;
1762 last_remaining = Some((key, filtered));
1763 updated = true;
1764 } else {
1765 last_remaining = Some((key, block_list));
1766 }
1767 }
1768 }
1769
1770 if let Some((last_key, last_value)) = last_remaining &&
1771 !is_sentinel(&last_key)
1772 {
1773 delete_shard(self, last_key)?;
1774 put_shard(self, create_sentinel(), &last_value)?;
1775 updated = true;
1776 }
1777
1778 if deleted {
1779 Ok(PruneShardOutcome::Deleted)
1780 } else if updated {
1781 Ok(PruneShardOutcome::Updated)
1782 } else {
1783 Ok(PruneShardOutcome::Unchanged)
1784 }
1785 }
1786
1787 pub fn prune_account_history_to(
1792 &mut self,
1793 address: Address,
1794 to_block: BlockNumber,
1795 ) -> ProviderResult<PruneShardOutcome> {
1796 let shards = self.provider.account_history_shards(address)?;
1797 self.prune_history_shards_inner(
1798 shards,
1799 to_block,
1800 |key| key.highest_block_number,
1801 |key| key.highest_block_number == u64::MAX,
1802 |batch, key| batch.delete::<tables::AccountsHistory>(key),
1803 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1804 || ShardedKey::new(address, u64::MAX),
1805 )
1806 }
1807
1808 pub fn prune_account_history_batch(
1817 &mut self,
1818 targets: &[(Address, BlockNumber)],
1819 ) -> ProviderResult<PrunedIndices> {
1820 if targets.is_empty() {
1821 return Ok(PrunedIndices::default());
1822 }
1823
1824 debug_assert!(
1825 targets.windows(2).all(|w| w[0].0 <= w[1].0),
1826 "prune_account_history_batch: targets must be sorted by address"
1827 );
1828
1829 const PREFIX_LEN: usize = 20;
1832
1833 let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
1834 let mut iter = self.provider.0.raw_iterator_cf(cf);
1835 let mut outcomes = PrunedIndices::default();
1836
1837 for (address, to_block) in targets {
1838 let start_key = ShardedKey::new(*address, 0u64).encode();
1840 let target_prefix = &start_key[..PREFIX_LEN];
1841
1842 let needs_seek = if iter.valid() {
1848 if let Some(current_key) = iter.key() {
1849 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1853 } else {
1854 true
1855 }
1856 } else {
1857 true
1858 };
1859
1860 if needs_seek {
1861 iter.seek(start_key);
1862 iter.status().map_err(|e| {
1863 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1864 message: e.to_string().into(),
1865 code: -1,
1866 }))
1867 })?;
1868 }
1869
1870 let mut shards = Vec::new();
1872 while iter.valid() {
1873 let Some(key_bytes) = iter.key() else { break };
1874
1875 let current_prefix = key_bytes.get(..PREFIX_LEN);
1877 if current_prefix != Some(target_prefix) {
1878 break;
1879 }
1880
1881 let key = ShardedKey::<Address>::decode(key_bytes)
1883 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1884
1885 let Some(value_bytes) = iter.value() else { break };
1886 let value = BlockNumberList::decompress(value_bytes)
1887 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1888
1889 shards.push((key, value));
1890 iter.next();
1891 }
1892
1893 match self.prune_history_shards_inner(
1894 shards,
1895 *to_block,
1896 |key| key.highest_block_number,
1897 |key| key.highest_block_number == u64::MAX,
1898 |batch, key| batch.delete::<tables::AccountsHistory>(key),
1899 |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1900 || ShardedKey::new(*address, u64::MAX),
1901 )? {
1902 PruneShardOutcome::Deleted => outcomes.deleted += 1,
1903 PruneShardOutcome::Updated => outcomes.updated += 1,
1904 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
1905 }
1906 }
1907
1908 Ok(outcomes)
1909 }
1910
1911 pub fn prune_storage_history_to(
1917 &mut self,
1918 address: Address,
1919 storage_key: B256,
1920 to_block: BlockNumber,
1921 ) -> ProviderResult<PruneShardOutcome> {
1922 let shards = self.provider.storage_history_shards(address, storage_key)?;
1923 self.prune_history_shards_inner(
1924 shards,
1925 to_block,
1926 |key| key.sharded_key.highest_block_number,
1927 |key| key.sharded_key.highest_block_number == u64::MAX,
1928 |batch, key| batch.delete::<tables::StoragesHistory>(key),
1929 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
1930 || StorageShardedKey::last(address, storage_key),
1931 )
1932 }
1933
1934 pub fn prune_storage_history_batch(
1944 &mut self,
1945 targets: &[((Address, B256), BlockNumber)],
1946 ) -> ProviderResult<PrunedIndices> {
1947 if targets.is_empty() {
1948 return Ok(PrunedIndices::default());
1949 }
1950
1951 debug_assert!(
1952 targets.windows(2).all(|w| w[0].0 <= w[1].0),
1953 "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
1954 );
1955
1956 const PREFIX_LEN: usize = 52;
1959
1960 let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
1961 let mut iter = self.provider.0.raw_iterator_cf(cf);
1962 let mut outcomes = PrunedIndices::default();
1963
1964 for ((address, storage_key), to_block) in targets {
1965 let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
1967 let target_prefix = &start_key[..PREFIX_LEN];
1968
1969 let needs_seek = if iter.valid() {
1975 if let Some(current_key) = iter.key() {
1976 current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1980 } else {
1981 true
1982 }
1983 } else {
1984 true
1985 };
1986
1987 if needs_seek {
1988 iter.seek(start_key);
1989 iter.status().map_err(|e| {
1990 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1991 message: e.to_string().into(),
1992 code: -1,
1993 }))
1994 })?;
1995 }
1996
1997 let mut shards = Vec::new();
1999 while iter.valid() {
2000 let Some(key_bytes) = iter.key() else { break };
2001
2002 let current_prefix = key_bytes.get(..PREFIX_LEN);
2004 if current_prefix != Some(target_prefix) {
2005 break;
2006 }
2007
2008 let key = StorageShardedKey::decode(key_bytes)
2010 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2011
2012 let Some(value_bytes) = iter.value() else { break };
2013 let value = BlockNumberList::decompress(value_bytes)
2014 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2015
2016 shards.push((key, value));
2017 iter.next();
2018 }
2019
2020 match self.prune_history_shards_inner(
2022 shards,
2023 *to_block,
2024 |key| key.sharded_key.highest_block_number,
2025 |key| key.sharded_key.highest_block_number == u64::MAX,
2026 |batch, key| batch.delete::<tables::StoragesHistory>(key),
2027 |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2028 || StorageShardedKey::last(*address, *storage_key),
2029 )? {
2030 PruneShardOutcome::Deleted => outcomes.deleted += 1,
2031 PruneShardOutcome::Updated => outcomes.updated += 1,
2032 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2033 }
2034 }
2035
2036 Ok(outcomes)
2037 }
2038
2039 pub fn unwind_storage_history_to(
2048 &mut self,
2049 address: Address,
2050 storage_key: B256,
2051 keep_to: BlockNumber,
2052 ) -> ProviderResult<()> {
2053 let shards = self.provider.storage_history_shards(address, storage_key)?;
2054 if shards.is_empty() {
2055 return Ok(());
2056 }
2057
2058 let boundary_idx = shards.iter().position(|(key, _)| {
2061 key.sharded_key.highest_block_number == u64::MAX ||
2062 key.sharded_key.highest_block_number > keep_to
2063 });
2064
2065 let Some(boundary_idx) = boundary_idx else {
2067 let (last_key, last_value) = shards.last().expect("shards is non-empty");
2068 if last_key.sharded_key.highest_block_number != u64::MAX {
2069 self.delete::<tables::StoragesHistory>(last_key.clone())?;
2070 self.put::<tables::StoragesHistory>(
2071 StorageShardedKey::last(address, storage_key),
2072 last_value,
2073 )?;
2074 }
2075 return Ok(());
2076 };
2077
2078 for (key, _) in shards.iter().skip(boundary_idx + 1) {
2080 self.delete::<tables::StoragesHistory>(key.clone())?;
2081 }
2082
2083 let (boundary_key, boundary_list) = &shards[boundary_idx];
2085
2086 self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2088
2089 let new_last =
2091 BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2092
2093 if new_last.is_empty() {
2094 if boundary_idx == 0 {
2097 return Ok(());
2099 }
2100
2101 let (prev_key, prev_value) = &shards[boundary_idx - 1];
2102 if prev_key.sharded_key.highest_block_number != u64::MAX {
2103 self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2104 self.put::<tables::StoragesHistory>(
2105 StorageShardedKey::last(address, storage_key),
2106 prev_value,
2107 )?;
2108 }
2109 return Ok(());
2110 }
2111
2112 self.put::<tables::StoragesHistory>(
2113 StorageShardedKey::last(address, storage_key),
2114 &new_last,
2115 )?;
2116
2117 Ok(())
2118 }
2119
2120 pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2124 let shards = self.provider.account_history_shards(address)?;
2125 for (key, _) in shards {
2126 self.delete::<tables::AccountsHistory>(key)?;
2127 }
2128 Ok(())
2129 }
2130
2131 pub fn clear_storage_history(
2135 &mut self,
2136 address: Address,
2137 storage_key: B256,
2138 ) -> ProviderResult<()> {
2139 let shards = self.provider.storage_history_shards(address, storage_key)?;
2140 for (key, _) in shards {
2141 self.delete::<tables::StoragesHistory>(key)?;
2142 }
2143 Ok(())
2144 }
2145}
2146
2147pub struct RocksTx<'db> {
2157 inner: Transaction<'db, OptimisticTransactionDB>,
2158 provider: &'db RocksDBProvider,
2159}
2160
2161impl fmt::Debug for RocksTx<'_> {
2162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2163 f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2164 }
2165}
2166
2167impl<'db> RocksTx<'db> {
2168 pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2170 let encoded_key = key.encode();
2171 self.get_encoded::<T>(&encoded_key)
2172 }
2173
2174 pub fn get_encoded<T: Table>(
2176 &self,
2177 key: &<T::Key as Encode>::Encoded,
2178 ) -> ProviderResult<Option<T::Value>> {
2179 let cf = self.provider.get_cf_handle::<T>()?;
2180 let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2181 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2182 message: e.to_string().into(),
2183 code: -1,
2184 }))
2185 })?;
2186
2187 Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2188 }
2189
2190 pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2192 let encoded_key = key.encode();
2193 self.put_encoded::<T>(&encoded_key, value)
2194 }
2195
2196 pub fn put_encoded<T: Table>(
2198 &self,
2199 key: &<T::Key as Encode>::Encoded,
2200 value: &T::Value,
2201 ) -> ProviderResult<()> {
2202 let cf = self.provider.get_cf_handle::<T>()?;
2203 let mut buf = Vec::new();
2204 let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2205
2206 self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2207 ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2208 info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2209 operation: DatabaseWriteOperation::PutUpsert,
2210 table_name: T::NAME,
2211 key: key.as_ref().to_vec(),
2212 })))
2213 })
2214 }
2215
2216 pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2218 let cf = self.provider.get_cf_handle::<T>()?;
2219 self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2220 ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2221 message: e.to_string().into(),
2222 code: -1,
2223 }))
2224 })
2225 }
2226
2227 pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2231 let cf = self.provider.get_cf_handle::<T>()?;
2232 let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2233 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2234 }
2235
2236 pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2238 let cf = self.provider.get_cf_handle::<T>()?;
2239 let encoded_key = key.encode();
2240 let iter = self
2241 .inner
2242 .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2243 Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2244 }
2245
2246 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2248 pub fn commit(self) -> ProviderResult<()> {
2249 self.inner.commit().map_err(|e| {
2250 ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2251 message: e.to_string().into(),
2252 code: -1,
2253 }))
2254 })
2255 }
2256
2257 #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2259 pub fn rollback(self) -> ProviderResult<()> {
2260 self.inner.rollback().map_err(|e| {
2261 ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2262 })
2263 }
2264
2265 pub fn account_history_info(
2271 &self,
2272 address: Address,
2273 block_number: BlockNumber,
2274 lowest_available_block_number: Option<BlockNumber>,
2275 ) -> ProviderResult<HistoryInfo> {
2276 let key = ShardedKey::new(address, block_number);
2277 self.history_info::<tables::AccountsHistory>(
2278 key.encode().as_ref(),
2279 block_number,
2280 lowest_available_block_number,
2281 |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
2282 |prev_bytes| {
2283 <ShardedKey<Address> as Decode>::decode(prev_bytes)
2284 .map(|k| k.key == address)
2285 .unwrap_or(false)
2286 },
2287 )
2288 }
2289
2290 pub fn storage_history_info(
2296 &self,
2297 address: Address,
2298 storage_key: B256,
2299 block_number: BlockNumber,
2300 lowest_available_block_number: Option<BlockNumber>,
2301 ) -> ProviderResult<HistoryInfo> {
2302 let key = StorageShardedKey::new(address, storage_key, block_number);
2303 self.history_info::<tables::StoragesHistory>(
2304 key.encode().as_ref(),
2305 block_number,
2306 lowest_available_block_number,
2307 |key_bytes| {
2308 let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
2309 Ok(k.address == address && k.sharded_key.key == storage_key)
2310 },
2311 |prev_bytes| {
2312 <StorageShardedKey as Decode>::decode(prev_bytes)
2313 .map(|k| k.address == address && k.sharded_key.key == storage_key)
2314 .unwrap_or(false)
2315 },
2316 )
2317 }
2318
2319 fn history_info<T>(
2324 &self,
2325 encoded_key: &[u8],
2326 block_number: BlockNumber,
2327 lowest_available_block_number: Option<BlockNumber>,
2328 key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
2329 prev_key_matches: impl Fn(&[u8]) -> bool,
2330 ) -> ProviderResult<HistoryInfo>
2331 where
2332 T: Table<Value = BlockNumberList>,
2333 {
2334 let is_maybe_pruned = lowest_available_block_number.is_some();
2336 let fallback = || {
2337 Ok(if is_maybe_pruned {
2338 HistoryInfo::MaybeInPlainState
2339 } else {
2340 HistoryInfo::NotYetWritten
2341 })
2342 };
2343
2344 let cf = self.provider.0.cf_handle_rw(T::NAME)?;
2345
2346 let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
2348 self.inner.raw_iterator_cf(&cf);
2349
2350 iter.seek(encoded_key);
2352 Self::raw_iter_status_ok(&iter)?;
2353
2354 if !iter.valid() {
2355 return fallback();
2361 }
2362
2363 let Some(key_bytes) = iter.key() else {
2365 return fallback();
2366 };
2367 if !key_matches(key_bytes)? {
2368 return fallback();
2370 }
2371
2372 let Some(value_bytes) = iter.value() else {
2374 return fallback();
2375 };
2376 let chunk = BlockNumberList::decompress(value_bytes)?;
2377 let (rank, found_block) = compute_history_rank(&chunk, block_number);
2378
2379 let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
2383 iter.prev();
2384 Self::raw_iter_status_ok(&iter)?;
2385 let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
2386 !has_prev
2387 } else {
2388 false
2389 };
2390
2391 Ok(HistoryInfo::from_lookup(
2392 found_block,
2393 is_before_first_write,
2394 lowest_available_block_number,
2395 ))
2396 }
2397
2398 fn raw_iter_status_ok(
2400 iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
2401 ) -> ProviderResult<()> {
2402 iter.status().map_err(|e| {
2403 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2404 message: e.to_string().into(),
2405 code: -1,
2406 }))
2407 })
2408 }
2409}
2410
2411enum RocksDBIterEnum<'db> {
2413 ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2415 ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2417}
2418
2419impl Iterator for RocksDBIterEnum<'_> {
2420 type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2421
2422 fn next(&mut self) -> Option<Self::Item> {
2423 match self {
2424 Self::ReadWrite(iter) => iter.next(),
2425 Self::ReadOnly(iter) => iter.next(),
2426 }
2427 }
2428}
2429
2430enum RocksDBRawIterEnum<'db> {
2435 ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2437 ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2439}
2440
2441impl RocksDBRawIterEnum<'_> {
2442 fn seek(&mut self, key: impl AsRef<[u8]>) {
2444 match self {
2445 Self::ReadWrite(iter) => iter.seek(key),
2446 Self::ReadOnly(iter) => iter.seek(key),
2447 }
2448 }
2449
2450 fn valid(&self) -> bool {
2452 match self {
2453 Self::ReadWrite(iter) => iter.valid(),
2454 Self::ReadOnly(iter) => iter.valid(),
2455 }
2456 }
2457
2458 fn key(&self) -> Option<&[u8]> {
2460 match self {
2461 Self::ReadWrite(iter) => iter.key(),
2462 Self::ReadOnly(iter) => iter.key(),
2463 }
2464 }
2465
2466 fn value(&self) -> Option<&[u8]> {
2468 match self {
2469 Self::ReadWrite(iter) => iter.value(),
2470 Self::ReadOnly(iter) => iter.value(),
2471 }
2472 }
2473
2474 fn next(&mut self) {
2476 match self {
2477 Self::ReadWrite(iter) => iter.next(),
2478 Self::ReadOnly(iter) => iter.next(),
2479 }
2480 }
2481
2482 fn status(&self) -> Result<(), rocksdb::Error> {
2484 match self {
2485 Self::ReadWrite(iter) => iter.status(),
2486 Self::ReadOnly(iter) => iter.status(),
2487 }
2488 }
2489}
2490
2491pub struct RocksDBIter<'db, T: Table> {
2495 inner: RocksDBIterEnum<'db>,
2496 _marker: std::marker::PhantomData<T>,
2497}
2498
2499impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2500 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2501 f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2502 }
2503}
2504
2505impl<T: Table> Iterator for RocksDBIter<'_, T> {
2506 type Item = ProviderResult<(T::Key, T::Value)>;
2507
2508 fn next(&mut self) -> Option<Self::Item> {
2509 Some(decode_iter_item::<T>(self.inner.next()?))
2510 }
2511}
2512
2513pub struct RocksDBRawIter<'db> {
2517 inner: RocksDBIterEnum<'db>,
2518}
2519
2520impl fmt::Debug for RocksDBRawIter<'_> {
2521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2522 f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2523 }
2524}
2525
2526impl Iterator for RocksDBRawIter<'_> {
2527 type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2528
2529 fn next(&mut self) -> Option<Self::Item> {
2530 match self.inner.next()? {
2531 Ok(kv) => Some(Ok(kv)),
2532 Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2533 message: e.to_string().into(),
2534 code: -1,
2535 })))),
2536 }
2537 }
2538}
2539
2540pub struct RocksTxIter<'tx, T: Table> {
2544 inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2545 _marker: std::marker::PhantomData<T>,
2546}
2547
2548impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2550 f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2551 }
2552}
2553
2554impl<T: Table> Iterator for RocksTxIter<'_, T> {
2555 type Item = ProviderResult<(T::Key, T::Value)>;
2556
2557 fn next(&mut self) -> Option<Self::Item> {
2558 Some(decode_iter_item::<T>(self.inner.next()?))
2559 }
2560}
2561
2562fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2567 let (key_bytes, value_bytes) = result.map_err(|e| {
2568 ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2569 message: e.to_string().into(),
2570 code: -1,
2571 }))
2572 })?;
2573
2574 let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2575 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2576
2577 let value = T::Value::decompress(&value_bytes)
2578 .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2579
2580 Ok((key, value))
2581}
2582
2583const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2585 match level {
2586 LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2587 LogLevel::Error => rocksdb::LogLevel::Error,
2588 LogLevel::Warn => rocksdb::LogLevel::Warn,
2589 LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2590 LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2591 }
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596 use super::*;
2597 use crate::providers::HistoryInfo;
2598 use alloy_primitives::{Address, TxHash, B256};
2599 use reth_db_api::{
2600 models::{
2601 sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2602 storage_sharded_key::StorageShardedKey,
2603 IntegerList,
2604 },
2605 table::Table,
2606 tables,
2607 };
2608 use tempfile::TempDir;
2609
2610 #[test]
2611 fn test_with_default_tables_registers_required_column_families() {
2612 let temp_dir = TempDir::new().unwrap();
2613
2614 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2616
2617 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2619 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2620 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2621
2622 let key = ShardedKey::new(Address::ZERO, 100);
2624 let value = IntegerList::default();
2625 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2626 assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2627
2628 let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2630 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2631 assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2632 }
2633
2634 #[derive(Debug)]
2635 struct TestTable;
2636
2637 impl Table for TestTable {
2638 const NAME: &'static str = "TestTable";
2639 const DUPSORT: bool = false;
2640 type Key = u64;
2641 type Value = Vec<u8>;
2642 }
2643
2644 #[test]
2645 fn test_basic_operations() {
2646 let temp_dir = TempDir::new().unwrap();
2647
2648 let provider = RocksDBBuilder::new(temp_dir.path())
2649 .with_table::<TestTable>() .build()
2651 .unwrap();
2652
2653 let key = 42u64;
2654 let value = b"test_value".to_vec();
2655
2656 provider.put::<TestTable>(key, &value).unwrap();
2658
2659 let result = provider.get::<TestTable>(key).unwrap();
2661 assert_eq!(result, Some(value));
2662
2663 provider.delete::<TestTable>(key).unwrap();
2665
2666 assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2668 }
2669
2670 #[test]
2671 fn test_batch_operations() {
2672 let temp_dir = TempDir::new().unwrap();
2673 let provider =
2674 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2675
2676 provider
2678 .write_batch(|batch| {
2679 for i in 0..10u64 {
2680 let value = format!("value_{i}").into_bytes();
2681 batch.put::<TestTable>(i, &value)?;
2682 }
2683 Ok(())
2684 })
2685 .unwrap();
2686
2687 for i in 0..10u64 {
2689 let value = format!("value_{i}").into_bytes();
2690 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2691 }
2692
2693 provider
2695 .write_batch(|batch| {
2696 for i in 0..10u64 {
2697 batch.delete::<TestTable>(i)?;
2698 }
2699 Ok(())
2700 })
2701 .unwrap();
2702
2703 for i in 0..10u64 {
2705 assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2706 }
2707 }
2708
2709 #[test]
2710 fn test_with_real_table() {
2711 let temp_dir = TempDir::new().unwrap();
2712 let provider = RocksDBBuilder::new(temp_dir.path())
2713 .with_table::<tables::TransactionHashNumbers>()
2714 .with_metrics()
2715 .build()
2716 .unwrap();
2717
2718 let tx_hash = TxHash::from(B256::from([1u8; 32]));
2719
2720 provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2722 assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2723
2724 provider
2726 .write_batch(|batch| {
2727 for i in 0..10u64 {
2728 let hash = TxHash::from(B256::from([i as u8; 32]));
2729 let value = i * 100;
2730 batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2731 }
2732 Ok(())
2733 })
2734 .unwrap();
2735
2736 for i in 0..10u64 {
2738 let hash = TxHash::from(B256::from([i as u8; 32]));
2739 assert_eq!(
2740 provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2741 Some(i * 100)
2742 );
2743 }
2744 }
2745 #[test]
2746 fn test_statistics_enabled() {
2747 let temp_dir = TempDir::new().unwrap();
2748 let provider = RocksDBBuilder::new(temp_dir.path())
2750 .with_table::<TestTable>()
2751 .with_statistics()
2752 .build()
2753 .unwrap();
2754
2755 for i in 0..10 {
2757 let value = vec![i as u8];
2758 provider.put::<TestTable>(i, &value).unwrap();
2759 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2761 }
2762 }
2763
2764 #[test]
2765 fn test_data_persistence() {
2766 let temp_dir = TempDir::new().unwrap();
2767 let provider =
2768 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2769
2770 let value = vec![42u8; 1000];
2772 for i in 0..100 {
2773 provider.put::<TestTable>(i, &value).unwrap();
2774 }
2775
2776 for i in 0..100 {
2778 assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2779 }
2780 }
2781
2782 #[test]
2783 fn test_transaction_read_your_writes() {
2784 let temp_dir = TempDir::new().unwrap();
2785 let provider =
2786 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2787
2788 let tx = provider.tx();
2790
2791 let key = 42u64;
2793 let value = b"test_value".to_vec();
2794 tx.put::<TestTable>(key, &value).unwrap();
2795
2796 let result = tx.get::<TestTable>(key).unwrap();
2798 assert_eq!(
2799 result,
2800 Some(value.clone()),
2801 "Transaction should see its own uncommitted writes"
2802 );
2803
2804 let provider_result = provider.get::<TestTable>(key).unwrap();
2806 assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2807
2808 tx.commit().unwrap();
2810
2811 let committed_result = provider.get::<TestTable>(key).unwrap();
2813 assert_eq!(committed_result, Some(value), "Committed data should be visible");
2814 }
2815
2816 #[test]
2817 fn test_transaction_rollback() {
2818 let temp_dir = TempDir::new().unwrap();
2819 let provider =
2820 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2821
2822 let key = 100u64;
2824 let initial_value = b"initial".to_vec();
2825 provider.put::<TestTable>(key, &initial_value).unwrap();
2826
2827 let tx = provider.tx();
2829 let new_value = b"modified".to_vec();
2830 tx.put::<TestTable>(key, &new_value).unwrap();
2831
2832 assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2834
2835 tx.rollback().unwrap();
2837
2838 let result = provider.get::<TestTable>(key).unwrap();
2840 assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2841 }
2842
2843 #[test]
2844 fn test_transaction_iterator() {
2845 let temp_dir = TempDir::new().unwrap();
2846 let provider =
2847 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2848
2849 let tx = provider.tx();
2851
2852 for i in 0..5u64 {
2854 let value = format!("value_{i}").into_bytes();
2855 tx.put::<TestTable>(i, &value).unwrap();
2856 }
2857
2858 let mut count = 0;
2860 for result in tx.iter::<TestTable>().unwrap() {
2861 let (key, value) = result.unwrap();
2862 assert_eq!(value, format!("value_{key}").into_bytes());
2863 count += 1;
2864 }
2865 assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2866
2867 tx.commit().unwrap();
2869 }
2870
2871 #[test]
2872 fn test_batch_manual_commit() {
2873 let temp_dir = TempDir::new().unwrap();
2874 let provider =
2875 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2876
2877 let mut batch = provider.batch();
2879
2880 for i in 0..10u64 {
2882 let value = format!("batch_value_{i}").into_bytes();
2883 batch.put::<TestTable>(i, &value).unwrap();
2884 }
2885
2886 assert_eq!(batch.len(), 10);
2888 assert!(!batch.is_empty());
2889
2890 assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2892
2893 batch.commit().unwrap();
2895
2896 for i in 0..10u64 {
2898 let value = format!("batch_value_{i}").into_bytes();
2899 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2900 }
2901 }
2902
2903 #[test]
2904 fn test_first_and_last_entry() {
2905 let temp_dir = TempDir::new().unwrap();
2906 let provider =
2907 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2908
2909 assert_eq!(provider.first::<TestTable>().unwrap(), None);
2911 assert_eq!(provider.last::<TestTable>().unwrap(), None);
2912
2913 provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2915 provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2916 provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2917
2918 let first = provider.first::<TestTable>().unwrap();
2920 assert_eq!(first, Some((5, b"value_5".to_vec())));
2921
2922 let last = provider.last::<TestTable>().unwrap();
2924 assert_eq!(last, Some((20, b"value_20".to_vec())));
2925 }
2926
2927 #[test]
2931 fn test_account_history_info_pruned_before_first_entry() {
2932 let temp_dir = TempDir::new().unwrap();
2933 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2934
2935 let address = Address::from([0x42; 20]);
2936
2937 let chunk = IntegerList::new([100, 200, 300]).unwrap();
2939 let shard_key = ShardedKey::new(address, u64::MAX);
2940 provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2941
2942 let tx = provider.tx();
2943
2944 let result = tx.account_history_info(address, 50, Some(100)).unwrap();
2949 assert_eq!(result, HistoryInfo::InChangeset(100));
2950
2951 tx.rollback().unwrap();
2952 }
2953
2954 #[test]
2955 fn test_account_history_shard_split_at_boundary() {
2956 let temp_dir = TempDir::new().unwrap();
2957 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2958
2959 let address = Address::from([0x42; 20]);
2960 let limit = NUM_OF_INDICES_IN_SHARD;
2961
2962 let indices: Vec<u64> = (0..=(limit as u64)).collect();
2964 let mut batch = provider.batch();
2965 batch.append_account_history_shard(address, indices).unwrap();
2966 batch.commit().unwrap();
2967
2968 let completed_key = ShardedKey::new(address, (limit - 1) as u64);
2970 let sentinel_key = ShardedKey::new(address, u64::MAX);
2971
2972 let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
2973 let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
2974
2975 assert!(completed_shard.is_some(), "completed shard should exist");
2976 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2977
2978 let completed_shard = completed_shard.unwrap();
2979 let sentinel_shard = sentinel_shard.unwrap();
2980
2981 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2982 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2983 }
2984
2985 #[test]
2986 fn test_account_history_multiple_shard_splits() {
2987 let temp_dir = TempDir::new().unwrap();
2988 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2989
2990 let address = Address::from([0x43; 20]);
2991 let limit = NUM_OF_INDICES_IN_SHARD;
2992
2993 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
2995 let mut batch = provider.batch();
2996 batch.append_account_history_shard(address, first_batch_indices).unwrap();
2997 batch.commit().unwrap();
2998
2999 let sentinel_key = ShardedKey::new(address, u64::MAX);
3001 let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3002 assert!(shard.is_some());
3003 assert_eq!(shard.unwrap().len(), limit as u64);
3004
3005 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3007 let mut batch = provider.batch();
3008 batch.append_account_history_shard(address, second_batch_indices).unwrap();
3009 batch.commit().unwrap();
3010
3011 let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3013 let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3014
3015 assert!(
3016 provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3017 "first completed shard should exist"
3018 );
3019 assert!(
3020 provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3021 "second completed shard should exist"
3022 );
3023 assert!(
3024 provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3025 "sentinel shard should exist"
3026 );
3027 }
3028
3029 #[test]
3030 fn test_storage_history_shard_split_at_boundary() {
3031 let temp_dir = TempDir::new().unwrap();
3032 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3033
3034 let address = Address::from([0x44; 20]);
3035 let slot = B256::from([0x55; 32]);
3036 let limit = NUM_OF_INDICES_IN_SHARD;
3037
3038 let indices: Vec<u64> = (0..=(limit as u64)).collect();
3040 let mut batch = provider.batch();
3041 batch.append_storage_history_shard(address, slot, indices).unwrap();
3042 batch.commit().unwrap();
3043
3044 let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3046 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3047
3048 let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3049 let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3050
3051 assert!(completed_shard.is_some(), "completed shard should exist");
3052 assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3053
3054 let completed_shard = completed_shard.unwrap();
3055 let sentinel_shard = sentinel_shard.unwrap();
3056
3057 assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3058 assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3059 }
3060
3061 #[test]
3062 fn test_storage_history_multiple_shard_splits() {
3063 let temp_dir = TempDir::new().unwrap();
3064 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3065
3066 let address = Address::from([0x46; 20]);
3067 let slot = B256::from([0x57; 32]);
3068 let limit = NUM_OF_INDICES_IN_SHARD;
3069
3070 let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3072 let mut batch = provider.batch();
3073 batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3074 batch.commit().unwrap();
3075
3076 let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3078 let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3079 assert!(shard.is_some());
3080 assert_eq!(shard.unwrap().len(), limit as u64);
3081
3082 let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3084 let mut batch = provider.batch();
3085 batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3086 batch.commit().unwrap();
3087
3088 let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3090 let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3091
3092 assert!(
3093 provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3094 "first completed shard should exist"
3095 );
3096 assert!(
3097 provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3098 "second completed shard should exist"
3099 );
3100 assert!(
3101 provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3102 "sentinel shard should exist"
3103 );
3104 }
3105
3106 #[test]
3107 fn test_clear_table() {
3108 let temp_dir = TempDir::new().unwrap();
3109 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3110
3111 let address = Address::from([0x42; 20]);
3112 let key = ShardedKey::new(address, u64::MAX);
3113 let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3114
3115 provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3116 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3117
3118 provider.clear::<tables::AccountsHistory>().unwrap();
3119
3120 assert!(
3121 provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3122 "table should be empty after clear"
3123 );
3124 assert!(
3125 provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3126 "first() should return None after clear"
3127 );
3128 }
3129
3130 #[test]
3131 fn test_clear_empty_table() {
3132 let temp_dir = TempDir::new().unwrap();
3133 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3134
3135 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3136
3137 provider.clear::<tables::AccountsHistory>().unwrap();
3138
3139 assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3140 }
3141
3142 #[test]
3143 fn test_unwind_account_history_to_basic() {
3144 let temp_dir = TempDir::new().unwrap();
3145 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3146
3147 let address = Address::from([0x42; 20]);
3148
3149 let mut batch = provider.batch();
3151 batch.append_account_history_shard(address, 0..=10).unwrap();
3152 batch.commit().unwrap();
3153
3154 let key = ShardedKey::new(address, u64::MAX);
3156 let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3157 assert!(result.is_some());
3158 let blocks: Vec<u64> = result.unwrap().iter().collect();
3159 assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3160
3161 let mut batch = provider.batch();
3163 batch.unwind_account_history_to(address, 5).unwrap();
3164 batch.commit().unwrap();
3165
3166 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3168 assert!(result.is_some());
3169 let blocks: Vec<u64> = result.unwrap().iter().collect();
3170 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3171 }
3172
3173 #[test]
3174 fn test_unwind_account_history_to_removes_all() {
3175 let temp_dir = TempDir::new().unwrap();
3176 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3177
3178 let address = Address::from([0x42; 20]);
3179
3180 let mut batch = provider.batch();
3182 batch.append_account_history_shard(address, 5..=10).unwrap();
3183 batch.commit().unwrap();
3184
3185 let mut batch = provider.batch();
3187 batch.unwind_account_history_to(address, 4).unwrap();
3188 batch.commit().unwrap();
3189
3190 let key = ShardedKey::new(address, u64::MAX);
3192 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3193 assert!(result.is_none(), "Should have no data after full unwind");
3194 }
3195
3196 #[test]
3197 fn test_unwind_account_history_to_no_op() {
3198 let temp_dir = TempDir::new().unwrap();
3199 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3200
3201 let address = Address::from([0x42; 20]);
3202
3203 let mut batch = provider.batch();
3205 batch.append_account_history_shard(address, 0..=5).unwrap();
3206 batch.commit().unwrap();
3207
3208 let mut batch = provider.batch();
3210 batch.unwind_account_history_to(address, 10).unwrap();
3211 batch.commit().unwrap();
3212
3213 let key = ShardedKey::new(address, u64::MAX);
3215 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3216 assert!(result.is_some());
3217 let blocks: Vec<u64> = result.unwrap().iter().collect();
3218 assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3219 }
3220
3221 #[test]
3222 fn test_unwind_account_history_to_block_zero() {
3223 let temp_dir = TempDir::new().unwrap();
3224 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3225
3226 let address = Address::from([0x42; 20]);
3227
3228 let mut batch = provider.batch();
3230 batch.append_account_history_shard(address, 0..=5).unwrap();
3231 batch.commit().unwrap();
3232
3233 let mut batch = provider.batch();
3236 batch.unwind_account_history_to(address, 0).unwrap();
3237 batch.commit().unwrap();
3238
3239 let key = ShardedKey::new(address, u64::MAX);
3241 let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3242 assert!(result.is_some());
3243 let blocks: Vec<u64> = result.unwrap().iter().collect();
3244 assert_eq!(blocks, vec![0]);
3245 }
3246
3247 #[test]
3248 fn test_unwind_account_history_to_multi_shard() {
3249 let temp_dir = TempDir::new().unwrap();
3250 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3251
3252 let address = Address::from([0x42; 20]);
3253
3254 let mut batch = provider.batch();
3257
3258 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3260 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3261
3262 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3264 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3265
3266 batch.commit().unwrap();
3267
3268 let shards = provider.account_history_shards(address).unwrap();
3270 assert_eq!(shards.len(), 2);
3271
3272 let mut batch = provider.batch();
3274 batch.unwind_account_history_to(address, 75).unwrap();
3275 batch.commit().unwrap();
3276
3277 let shards = provider.account_history_shards(address).unwrap();
3279 assert_eq!(shards.len(), 2);
3280
3281 assert_eq!(shards[0].0.highest_block_number, 50);
3283 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3284
3285 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3287 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3288 }
3289
3290 #[test]
3291 fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3292 let temp_dir = TempDir::new().unwrap();
3293 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3294
3295 let address = Address::from([0x42; 20]);
3296
3297 let mut batch = provider.batch();
3299
3300 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3302 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3303
3304 let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3306 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3307
3308 batch.commit().unwrap();
3309
3310 let mut batch = provider.batch();
3312 batch.unwind_account_history_to(address, 60).unwrap();
3313 batch.commit().unwrap();
3314
3315 let shards = provider.account_history_shards(address).unwrap();
3317 assert_eq!(shards.len(), 1);
3318 assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3319 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3320 }
3321
3322 #[test]
3323 fn test_account_history_shards_iterator() {
3324 let temp_dir = TempDir::new().unwrap();
3325 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3326
3327 let address = Address::from([0x42; 20]);
3328 let other_address = Address::from([0x43; 20]);
3329
3330 let mut batch = provider.batch();
3332 batch.append_account_history_shard(address, 0..=5).unwrap();
3333 batch.append_account_history_shard(other_address, 10..=15).unwrap();
3334 batch.commit().unwrap();
3335
3336 let shards = provider.account_history_shards(address).unwrap();
3338 assert_eq!(shards.len(), 1);
3339 assert_eq!(shards[0].0.key, address);
3340
3341 let shards = provider.account_history_shards(other_address).unwrap();
3343 assert_eq!(shards.len(), 1);
3344 assert_eq!(shards[0].0.key, other_address);
3345
3346 let non_existent = Address::from([0x99; 20]);
3348 let shards = provider.account_history_shards(non_existent).unwrap();
3349 assert!(shards.is_empty());
3350 }
3351
3352 #[test]
3353 fn test_clear_account_history() {
3354 let temp_dir = TempDir::new().unwrap();
3355 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3356
3357 let address = Address::from([0x42; 20]);
3358
3359 let mut batch = provider.batch();
3361 batch.append_account_history_shard(address, 0..=10).unwrap();
3362 batch.commit().unwrap();
3363
3364 let mut batch = provider.batch();
3366 batch.clear_account_history(address).unwrap();
3367 batch.commit().unwrap();
3368
3369 let shards = provider.account_history_shards(address).unwrap();
3371 assert!(shards.is_empty(), "All shards should be deleted");
3372 }
3373
3374 #[test]
3375 fn test_unwind_non_sentinel_boundary() {
3376 let temp_dir = TempDir::new().unwrap();
3377 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3378
3379 let address = Address::from([0x42; 20]);
3380
3381 let mut batch = provider.batch();
3383
3384 let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3386 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3387
3388 let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3390 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3391
3392 let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3394 batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3395
3396 batch.commit().unwrap();
3397
3398 let shards = provider.account_history_shards(address).unwrap();
3400 assert_eq!(shards.len(), 3);
3401
3402 let mut batch = provider.batch();
3404 batch.unwind_account_history_to(address, 75).unwrap();
3405 batch.commit().unwrap();
3406
3407 let shards = provider.account_history_shards(address).unwrap();
3409 assert_eq!(shards.len(), 2);
3410
3411 assert_eq!(shards[0].0.highest_block_number, 50);
3413 assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3414
3415 assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3417 assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3418 }
3419
3420 #[test]
3421 fn test_batch_auto_commit_on_threshold() {
3422 let temp_dir = TempDir::new().unwrap();
3423 let provider =
3424 RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3425
3426 let mut batch = RocksDBBatch {
3428 provider: &provider,
3429 inner: WriteBatchWithTransaction::<true>::default(),
3430 buf: Vec::new(),
3431 auto_commit_threshold: Some(1024), };
3433
3434 for i in 0..100u64 {
3437 let value = format!("value_{i:04}").into_bytes();
3438 batch.put::<TestTable>(i, &value).unwrap();
3439 }
3440
3441 let first_visible = provider.get::<TestTable>(0).unwrap();
3444 assert!(first_visible.is_some(), "Auto-committed data should be visible");
3445
3446 batch.commit().unwrap();
3448
3449 for i in 0..100u64 {
3451 let value = format!("value_{i:04}").into_bytes();
3452 assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3453 }
3454 }
3455
3456 struct AccountPruneCase {
3460 name: &'static str,
3461 initial_shards: &'static [(u64, &'static [u64])],
3462 prune_to: u64,
3463 expected_outcome: PruneShardOutcome,
3464 expected_shards: &'static [(u64, &'static [u64])],
3465 }
3466
3467 struct StoragePruneCase {
3469 name: &'static str,
3470 initial_shards: &'static [(u64, &'static [u64])],
3471 prune_to: u64,
3472 expected_outcome: PruneShardOutcome,
3473 expected_shards: &'static [(u64, &'static [u64])],
3474 }
3475
3476 #[test]
3477 fn test_prune_account_history_cases() {
3478 const MAX: u64 = u64::MAX;
3479 const CASES: &[AccountPruneCase] = &[
3480 AccountPruneCase {
3481 name: "single_shard_truncate",
3482 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3483 prune_to: 25,
3484 expected_outcome: PruneShardOutcome::Updated,
3485 expected_shards: &[(MAX, &[30, 40])],
3486 },
3487 AccountPruneCase {
3488 name: "single_shard_delete_all",
3489 initial_shards: &[(MAX, &[10, 20])],
3490 prune_to: 20,
3491 expected_outcome: PruneShardOutcome::Deleted,
3492 expected_shards: &[],
3493 },
3494 AccountPruneCase {
3495 name: "single_shard_noop",
3496 initial_shards: &[(MAX, &[10, 20])],
3497 prune_to: 5,
3498 expected_outcome: PruneShardOutcome::Unchanged,
3499 expected_shards: &[(MAX, &[10, 20])],
3500 },
3501 AccountPruneCase {
3502 name: "no_shards",
3503 initial_shards: &[],
3504 prune_to: 100,
3505 expected_outcome: PruneShardOutcome::Unchanged,
3506 expected_shards: &[],
3507 },
3508 AccountPruneCase {
3509 name: "multi_shard_truncate_first",
3510 initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3511 prune_to: 25,
3512 expected_outcome: PruneShardOutcome::Updated,
3513 expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3514 },
3515 AccountPruneCase {
3516 name: "delete_first_shard_sentinel_unchanged",
3517 initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3518 prune_to: 20,
3519 expected_outcome: PruneShardOutcome::Deleted,
3520 expected_shards: &[(MAX, &[30, 40])],
3521 },
3522 AccountPruneCase {
3523 name: "multi_shard_delete_all_but_last",
3524 initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3525 prune_to: 22,
3526 expected_outcome: PruneShardOutcome::Deleted,
3527 expected_shards: &[(MAX, &[25, 30])],
3528 },
3529 AccountPruneCase {
3530 name: "mid_shard_preserves_key",
3531 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3532 prune_to: 25,
3533 expected_outcome: PruneShardOutcome::Updated,
3534 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3535 },
3536 AccountPruneCase {
3538 name: "equiv_delete_early_shards_keep_sentinel",
3539 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3540 prune_to: 55,
3541 expected_outcome: PruneShardOutcome::Deleted,
3542 expected_shards: &[(MAX, &[60, 70])],
3543 },
3544 AccountPruneCase {
3545 name: "equiv_sentinel_becomes_empty_with_prev",
3546 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3547 prune_to: 40,
3548 expected_outcome: PruneShardOutcome::Deleted,
3549 expected_shards: &[(MAX, &[50])],
3550 },
3551 AccountPruneCase {
3552 name: "equiv_all_shards_become_empty",
3553 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3554 prune_to: 51,
3555 expected_outcome: PruneShardOutcome::Deleted,
3556 expected_shards: &[],
3557 },
3558 AccountPruneCase {
3559 name: "equiv_non_sentinel_last_shard_promoted",
3560 initial_shards: &[(100, &[50, 75, 100])],
3561 prune_to: 60,
3562 expected_outcome: PruneShardOutcome::Updated,
3563 expected_shards: &[(MAX, &[75, 100])],
3564 },
3565 AccountPruneCase {
3566 name: "equiv_filter_within_shard",
3567 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3568 prune_to: 25,
3569 expected_outcome: PruneShardOutcome::Updated,
3570 expected_shards: &[(MAX, &[30, 40])],
3571 },
3572 AccountPruneCase {
3573 name: "equiv_multi_shard_partial_delete",
3574 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3575 prune_to: 35,
3576 expected_outcome: PruneShardOutcome::Deleted,
3577 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3578 },
3579 ];
3580
3581 let address = Address::from([0x42; 20]);
3582
3583 for case in CASES {
3584 let temp_dir = TempDir::new().unwrap();
3585 let provider =
3586 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3587
3588 let mut batch = provider.batch();
3590 for (highest, blocks) in case.initial_shards {
3591 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3592 batch
3593 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3594 .unwrap();
3595 }
3596 batch.commit().unwrap();
3597
3598 let mut batch = provider.batch();
3600 let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3601 batch.commit().unwrap();
3602
3603 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3605
3606 let shards = provider.account_history_shards(address).unwrap();
3608 assert_eq!(
3609 shards.len(),
3610 case.expected_shards.len(),
3611 "case '{}': wrong shard count",
3612 case.name
3613 );
3614 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3615 shards.iter().zip(case.expected_shards.iter()).enumerate()
3616 {
3617 assert_eq!(
3618 key.highest_block_number, *exp_key,
3619 "case '{}': shard {} wrong key",
3620 case.name, i
3621 );
3622 assert_eq!(
3623 blocks.iter().collect::<Vec<_>>(),
3624 *exp_blocks,
3625 "case '{}': shard {} wrong blocks",
3626 case.name,
3627 i
3628 );
3629 }
3630 }
3631 }
3632
3633 #[test]
3634 fn test_prune_storage_history_cases() {
3635 const MAX: u64 = u64::MAX;
3636 const CASES: &[StoragePruneCase] = &[
3637 StoragePruneCase {
3638 name: "single_shard_truncate",
3639 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3640 prune_to: 25,
3641 expected_outcome: PruneShardOutcome::Updated,
3642 expected_shards: &[(MAX, &[30, 40])],
3643 },
3644 StoragePruneCase {
3645 name: "single_shard_delete_all",
3646 initial_shards: &[(MAX, &[10, 20])],
3647 prune_to: 20,
3648 expected_outcome: PruneShardOutcome::Deleted,
3649 expected_shards: &[],
3650 },
3651 StoragePruneCase {
3652 name: "noop",
3653 initial_shards: &[(MAX, &[10, 20])],
3654 prune_to: 5,
3655 expected_outcome: PruneShardOutcome::Unchanged,
3656 expected_shards: &[(MAX, &[10, 20])],
3657 },
3658 StoragePruneCase {
3659 name: "no_shards",
3660 initial_shards: &[],
3661 prune_to: 100,
3662 expected_outcome: PruneShardOutcome::Unchanged,
3663 expected_shards: &[],
3664 },
3665 StoragePruneCase {
3666 name: "mid_shard_preserves_key",
3667 initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3668 prune_to: 25,
3669 expected_outcome: PruneShardOutcome::Updated,
3670 expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3671 },
3672 StoragePruneCase {
3674 name: "equiv_sentinel_promotion",
3675 initial_shards: &[(100, &[50, 75, 100])],
3676 prune_to: 60,
3677 expected_outcome: PruneShardOutcome::Updated,
3678 expected_shards: &[(MAX, &[75, 100])],
3679 },
3680 StoragePruneCase {
3681 name: "equiv_delete_early_shards_keep_sentinel",
3682 initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3683 prune_to: 55,
3684 expected_outcome: PruneShardOutcome::Deleted,
3685 expected_shards: &[(MAX, &[60, 70])],
3686 },
3687 StoragePruneCase {
3688 name: "equiv_sentinel_becomes_empty_with_prev",
3689 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3690 prune_to: 40,
3691 expected_outcome: PruneShardOutcome::Deleted,
3692 expected_shards: &[(MAX, &[50])],
3693 },
3694 StoragePruneCase {
3695 name: "equiv_all_shards_become_empty",
3696 initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3697 prune_to: 51,
3698 expected_outcome: PruneShardOutcome::Deleted,
3699 expected_shards: &[],
3700 },
3701 StoragePruneCase {
3702 name: "equiv_filter_within_shard",
3703 initial_shards: &[(MAX, &[10, 20, 30, 40])],
3704 prune_to: 25,
3705 expected_outcome: PruneShardOutcome::Updated,
3706 expected_shards: &[(MAX, &[30, 40])],
3707 },
3708 StoragePruneCase {
3709 name: "equiv_multi_shard_partial_delete",
3710 initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3711 prune_to: 35,
3712 expected_outcome: PruneShardOutcome::Deleted,
3713 expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3714 },
3715 ];
3716
3717 let address = Address::from([0x42; 20]);
3718 let storage_key = B256::from([0x01; 32]);
3719
3720 for case in CASES {
3721 let temp_dir = TempDir::new().unwrap();
3722 let provider =
3723 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3724
3725 let mut batch = provider.batch();
3727 for (highest, blocks) in case.initial_shards {
3728 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3729 let key = if *highest == MAX {
3730 StorageShardedKey::last(address, storage_key)
3731 } else {
3732 StorageShardedKey::new(address, storage_key, *highest)
3733 };
3734 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3735 }
3736 batch.commit().unwrap();
3737
3738 let mut batch = provider.batch();
3740 let outcome =
3741 batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3742 batch.commit().unwrap();
3743
3744 assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3746
3747 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3749 assert_eq!(
3750 shards.len(),
3751 case.expected_shards.len(),
3752 "case '{}': wrong shard count",
3753 case.name
3754 );
3755 for (i, ((key, blocks), (exp_key, exp_blocks))) in
3756 shards.iter().zip(case.expected_shards.iter()).enumerate()
3757 {
3758 assert_eq!(
3759 key.sharded_key.highest_block_number, *exp_key,
3760 "case '{}': shard {} wrong key",
3761 case.name, i
3762 );
3763 assert_eq!(
3764 blocks.iter().collect::<Vec<_>>(),
3765 *exp_blocks,
3766 "case '{}': shard {} wrong blocks",
3767 case.name,
3768 i
3769 );
3770 }
3771 }
3772 }
3773
3774 #[test]
3775 fn test_prune_storage_history_does_not_affect_other_slots() {
3776 let temp_dir = TempDir::new().unwrap();
3777 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3778
3779 let address = Address::from([0x42; 20]);
3780 let slot1 = B256::from([0x01; 32]);
3781 let slot2 = B256::from([0x02; 32]);
3782
3783 let mut batch = provider.batch();
3785 batch
3786 .put::<tables::StoragesHistory>(
3787 StorageShardedKey::last(address, slot1),
3788 &BlockNumberList::new_pre_sorted([10u64, 20]),
3789 )
3790 .unwrap();
3791 batch
3792 .put::<tables::StoragesHistory>(
3793 StorageShardedKey::last(address, slot2),
3794 &BlockNumberList::new_pre_sorted([30u64, 40]),
3795 )
3796 .unwrap();
3797 batch.commit().unwrap();
3798
3799 let mut batch = provider.batch();
3801 let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
3802 batch.commit().unwrap();
3803
3804 assert_eq!(outcome, PruneShardOutcome::Deleted);
3805
3806 let shards1 = provider.storage_history_shards(address, slot1).unwrap();
3808 assert!(shards1.is_empty());
3809
3810 let shards2 = provider.storage_history_shards(address, slot2).unwrap();
3812 assert_eq!(shards2.len(), 1);
3813 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
3814 }
3815
3816 #[test]
3817 fn test_prune_invariants() {
3818 let address = Address::from([0x42; 20]);
3820 let storage_key = B256::from([0x01; 32]);
3821
3822 #[allow(clippy::type_complexity)]
3824 let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
3825 (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
3827 (&[(100, &[50, 100])], 60),
3829 ];
3830
3831 for (initial_shards, prune_to) in invariant_cases {
3832 {
3834 let temp_dir = TempDir::new().unwrap();
3835 let provider =
3836 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3837
3838 let mut batch = provider.batch();
3839 for (highest, blocks) in *initial_shards {
3840 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3841 batch
3842 .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3843 .unwrap();
3844 }
3845 batch.commit().unwrap();
3846
3847 let mut batch = provider.batch();
3848 batch.prune_account_history_to(address, *prune_to).unwrap();
3849 batch.commit().unwrap();
3850
3851 let shards = provider.account_history_shards(address).unwrap();
3852
3853 for (key, blocks) in &shards {
3855 assert!(
3856 !blocks.is_empty(),
3857 "Account: empty shard at key {}",
3858 key.highest_block_number
3859 );
3860 }
3861
3862 if !shards.is_empty() {
3864 let last = shards.last().unwrap();
3865 assert_eq!(
3866 last.0.highest_block_number,
3867 u64::MAX,
3868 "Account: last shard must be sentinel"
3869 );
3870 }
3871 }
3872
3873 {
3875 let temp_dir = TempDir::new().unwrap();
3876 let provider =
3877 RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3878
3879 let mut batch = provider.batch();
3880 for (highest, blocks) in *initial_shards {
3881 let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3882 let key = if *highest == u64::MAX {
3883 StorageShardedKey::last(address, storage_key)
3884 } else {
3885 StorageShardedKey::new(address, storage_key, *highest)
3886 };
3887 batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3888 }
3889 batch.commit().unwrap();
3890
3891 let mut batch = provider.batch();
3892 batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
3893 batch.commit().unwrap();
3894
3895 let shards = provider.storage_history_shards(address, storage_key).unwrap();
3896
3897 for (key, blocks) in &shards {
3899 assert!(
3900 !blocks.is_empty(),
3901 "Storage: empty shard at key {}",
3902 key.sharded_key.highest_block_number
3903 );
3904 }
3905
3906 if !shards.is_empty() {
3908 let last = shards.last().unwrap();
3909 assert_eq!(
3910 last.0.sharded_key.highest_block_number,
3911 u64::MAX,
3912 "Storage: last shard must be sentinel"
3913 );
3914 }
3915 }
3916 }
3917 }
3918
3919 #[test]
3920 fn test_prune_account_history_batch_multiple_sorted_targets() {
3921 let temp_dir = TempDir::new().unwrap();
3922 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3923
3924 let addr1 = Address::from([0x01; 20]);
3925 let addr2 = Address::from([0x02; 20]);
3926 let addr3 = Address::from([0x03; 20]);
3927
3928 let mut batch = provider.batch();
3930 batch
3931 .put::<tables::AccountsHistory>(
3932 ShardedKey::new(addr1, u64::MAX),
3933 &BlockNumberList::new_pre_sorted([10, 20, 30]),
3934 )
3935 .unwrap();
3936 batch
3937 .put::<tables::AccountsHistory>(
3938 ShardedKey::new(addr2, u64::MAX),
3939 &BlockNumberList::new_pre_sorted([5, 10, 15]),
3940 )
3941 .unwrap();
3942 batch
3943 .put::<tables::AccountsHistory>(
3944 ShardedKey::new(addr3, u64::MAX),
3945 &BlockNumberList::new_pre_sorted([100, 200]),
3946 )
3947 .unwrap();
3948 batch.commit().unwrap();
3949
3950 let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
3952 targets.sort_by_key(|(addr, _)| *addr);
3953
3954 let mut batch = provider.batch();
3955 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
3956 batch.commit().unwrap();
3957
3958 assert_eq!(outcomes.updated, 2);
3962 assert_eq!(outcomes.unchanged, 1);
3963
3964 let shards1 = provider.account_history_shards(addr1).unwrap();
3965 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
3966
3967 let shards2 = provider.account_history_shards(addr2).unwrap();
3968 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
3969
3970 let shards3 = provider.account_history_shards(addr3).unwrap();
3971 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
3972 }
3973
3974 #[test]
3975 fn test_prune_account_history_batch_target_with_no_shards() {
3976 let temp_dir = TempDir::new().unwrap();
3977 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3978
3979 let addr1 = Address::from([0x01; 20]);
3980 let addr2 = Address::from([0x02; 20]); let addr3 = Address::from([0x03; 20]);
3982
3983 let mut batch = provider.batch();
3985 batch
3986 .put::<tables::AccountsHistory>(
3987 ShardedKey::new(addr1, u64::MAX),
3988 &BlockNumberList::new_pre_sorted([10, 20]),
3989 )
3990 .unwrap();
3991 batch
3992 .put::<tables::AccountsHistory>(
3993 ShardedKey::new(addr3, u64::MAX),
3994 &BlockNumberList::new_pre_sorted([30, 40]),
3995 )
3996 .unwrap();
3997 batch.commit().unwrap();
3998
3999 let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4001 targets.sort_by_key(|(addr, _)| *addr);
4002
4003 let mut batch = provider.batch();
4004 let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4005 batch.commit().unwrap();
4006
4007 assert_eq!(outcomes.updated, 2);
4011 assert_eq!(outcomes.unchanged, 1);
4012
4013 let shards1 = provider.account_history_shards(addr1).unwrap();
4014 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4015
4016 let shards3 = provider.account_history_shards(addr3).unwrap();
4017 assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4018 }
4019
4020 #[test]
4021 fn test_prune_storage_history_batch_multiple_sorted_targets() {
4022 let temp_dir = TempDir::new().unwrap();
4023 let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4024
4025 let addr = Address::from([0x42; 20]);
4026 let slot1 = B256::from([0x01; 32]);
4027 let slot2 = B256::from([0x02; 32]);
4028
4029 let mut batch = provider.batch();
4031 batch
4032 .put::<tables::StoragesHistory>(
4033 StorageShardedKey::new(addr, slot1, u64::MAX),
4034 &BlockNumberList::new_pre_sorted([10, 20, 30]),
4035 )
4036 .unwrap();
4037 batch
4038 .put::<tables::StoragesHistory>(
4039 StorageShardedKey::new(addr, slot2, u64::MAX),
4040 &BlockNumberList::new_pre_sorted([5, 15, 25]),
4041 )
4042 .unwrap();
4043 batch.commit().unwrap();
4044
4045 let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4047 targets.sort_by_key(|((a, s), _)| (*a, *s));
4048
4049 let mut batch = provider.batch();
4050 let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4051 batch.commit().unwrap();
4052
4053 assert_eq!(outcomes.updated, 2);
4054
4055 let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4056 assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4057
4058 let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4059 assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4060 }
4061}