Skip to main content

reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5    map::{AddressMap, HashMap},
6    Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13    database_metrics::DatabaseMetrics,
14    models::{
15        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16        StorageSettings,
17    },
18    table::{Compress, Decode, Decompress, Encode, Table},
19    tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25    provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30    OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
31    WriteBatchWithTransaction, WriteOptions, DB,
32};
33use std::{
34    collections::BTreeMap,
35    fmt,
36    path::{Path, PathBuf},
37    sync::Arc,
38};
39use tracing::instrument;
40
41/// Pending `RocksDB` batches type alias.
42pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
43
44/// Raw key-value result from a `RocksDB` iterator.
45type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
46
47/// Statistics for a single `RocksDB` table (column family).
48#[derive(Debug, Clone)]
49pub struct RocksDBTableStats {
50    /// Size of SST files on disk in bytes.
51    pub sst_size_bytes: u64,
52    /// Size of memtables in memory in bytes.
53    pub memtable_size_bytes: u64,
54    /// Name of the table/column family.
55    pub name: String,
56    /// Estimated number of keys in the table.
57    pub estimated_num_keys: u64,
58    /// Estimated size of live data in bytes (SST files + memtables).
59    pub estimated_size_bytes: u64,
60    /// Estimated bytes pending compaction (reclaimable space).
61    pub pending_compaction_bytes: u64,
62}
63
64/// Database-level statistics for `RocksDB`.
65///
66/// Contains both per-table statistics and DB-level metrics like WAL size.
67#[derive(Debug, Clone)]
68pub struct RocksDBStats {
69    /// Statistics for each table (column family).
70    pub tables: Vec<RocksDBTableStats>,
71    /// Total size of WAL (Write-Ahead Log) files in bytes.
72    ///
73    /// WAL is shared across all tables and not included in per-table metrics.
74    pub wal_size_bytes: u64,
75}
76
77/// Context for `RocksDB` block writes.
78#[derive(Clone)]
79pub(crate) struct RocksDBWriteCtx {
80    /// The first block number being written.
81    pub first_block_number: BlockNumber,
82    /// The prune mode for transaction lookup, if any.
83    pub prune_tx_lookup: Option<PruneMode>,
84    /// Storage settings determining what goes to `RocksDB`.
85    pub storage_settings: StorageSettings,
86    /// Pending batches to push to after writing.
87    pub pending_batches: PendingRocksDBBatches,
88}
89
90impl fmt::Debug for RocksDBWriteCtx {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("RocksDBWriteCtx")
93            .field("first_block_number", &self.first_block_number)
94            .field("prune_tx_lookup", &self.prune_tx_lookup)
95            .field("storage_settings", &self.storage_settings)
96            .field("pending_batches", &"<pending batches>")
97            .finish()
98    }
99}
100
101/// Default cache size for `RocksDB` block cache (128 MB).
102const DEFAULT_CACHE_SIZE: usize = 128 << 20;
103
104/// Default block size for `RocksDB` tables (16 KB).
105const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
106
107/// Default max background jobs for `RocksDB` compaction and flushing.
108const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
109
110/// Default max open file descriptors for `RocksDB`.
111///
112/// Caps the number of SST file handles `RocksDB` keeps open simultaneously.
113/// Set to 512 to stay within the common default OS `ulimit -n` of 1024,
114/// leaving headroom for MDBX, static files, and other I/O.
115/// `RocksDB` uses an internal table cache and re-opens files on demand,
116/// so this has negligible performance impact on read-heavy workloads.
117const DEFAULT_MAX_OPEN_FILES: i32 = 512;
118
119/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
120const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
121
122/// Default write buffer size for `RocksDB` memtables (128 MB).
123///
124/// Larger memtables reduce flush frequency during burst writes, providing more consistent
125/// tail latency. Benchmarks showed 128 MB reduces p99 latency variance by ~80% compared
126/// to 64 MB default, with negligible impact on mean throughput.
127const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
128
129/// Default buffer capacity for compression in batches.
130/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
131/// reducing the first few reallocations without over-allocating.
132const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
133
134/// Default auto-commit threshold for batch writes (4 GiB).
135///
136/// When a batch exceeds this size, it is automatically committed to prevent OOM
137/// during large bulk writes. The consistency check on startup heals any crash
138/// that occurs between auto-commits.
139const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
140
141/// Builder for [`RocksDBProvider`].
142pub struct RocksDBBuilder {
143    path: PathBuf,
144    column_families: Vec<String>,
145    enable_metrics: bool,
146    enable_statistics: bool,
147    log_level: rocksdb::LogLevel,
148    block_cache: Cache,
149    read_only: bool,
150}
151
152impl fmt::Debug for RocksDBBuilder {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.debug_struct("RocksDBBuilder")
155            .field("path", &self.path)
156            .field("column_families", &self.column_families)
157            .field("enable_metrics", &self.enable_metrics)
158            .finish()
159    }
160}
161
162impl RocksDBBuilder {
163    /// Creates a new builder with optimized default options.
164    pub fn new(path: impl AsRef<Path>) -> Self {
165        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
166        Self {
167            path: path.as_ref().to_path_buf(),
168            column_families: Vec::new(),
169            enable_metrics: false,
170            enable_statistics: false,
171            log_level: rocksdb::LogLevel::Info,
172            block_cache: cache,
173            read_only: false,
174        }
175    }
176
177    /// Creates default table options with shared block cache.
178    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
179        let mut table_options = BlockBasedOptions::default();
180        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
181        table_options.set_cache_index_and_filter_blocks(true);
182        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
183        // Shared block cache for all column families.
184        table_options.set_block_cache(cache);
185        table_options
186    }
187
188    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
189    fn default_options(
190        log_level: rocksdb::LogLevel,
191        cache: &Cache,
192        enable_statistics: bool,
193    ) -> Options {
194        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
195        let table_options = Self::default_table_options(cache);
196
197        let mut options = Options::default();
198        options.set_block_based_table_factory(&table_options);
199        options.create_if_missing(true);
200        options.create_missing_column_families(true);
201        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
202        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
203
204        options.set_bottommost_compression_type(DBCompressionType::Zstd);
205        options.set_bottommost_zstd_max_train_bytes(0, true);
206        options.set_compression_type(DBCompressionType::Lz4);
207        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
208
209        options.set_log_level(log_level);
210
211        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
212
213        // Delete obsolete WAL files immediately after all column families have flushed.
214        // Both set to 0 means "delete ASAP, no archival".
215        options.set_wal_ttl_seconds(0);
216        options.set_wal_size_limit_mb(0);
217
218        // Statistics can view from RocksDB log file
219        if enable_statistics {
220            options.enable_statistics();
221        }
222
223        options
224    }
225
226    /// Creates optimized column family options.
227    fn default_column_family_options(cache: &Cache) -> Options {
228        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
229        let table_options = Self::default_table_options(cache);
230
231        let mut cf_options = Options::default();
232        cf_options.set_block_based_table_factory(&table_options);
233        cf_options.set_level_compaction_dynamic_level_bytes(true);
234        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
235        cf_options.set_compression_type(DBCompressionType::Lz4);
236        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
237        // Only use Zstd compression, disable dictionary training
238        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
239        cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
240
241        cf_options
242    }
243
244    /// Creates optimized column family options for `TransactionHashNumbers`.
245    ///
246    /// This table stores `B256 -> TxNumber` mappings where:
247    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
248    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
249    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
250    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
251        let mut table_options = BlockBasedOptions::default();
252        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
253        table_options.set_cache_index_and_filter_blocks(true);
254        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
255        table_options.set_block_cache(cache);
256        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
257        // and waste memory
258
259        let mut cf_options = Options::default();
260        cf_options.set_block_based_table_factory(&table_options);
261        cf_options.set_level_compaction_dynamic_level_bytes(true);
262        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
263        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
264        cf_options.set_compression_type(DBCompressionType::None);
265        cf_options.set_bottommost_compression_type(DBCompressionType::None);
266
267        cf_options
268    }
269
270    /// Adds a column family for a specific table type.
271    pub fn with_table<T: Table>(mut self) -> Self {
272        self.column_families.push(T::NAME.to_string());
273        self
274    }
275
276    /// Registers the default tables used by reth for `RocksDB` storage.
277    ///
278    /// This registers:
279    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
280    /// - [`tables::AccountsHistory`] - Account history index
281    /// - [`tables::StoragesHistory`] - Storage history index
282    pub fn with_default_tables(self) -> Self {
283        self.with_table::<tables::TransactionHashNumbers>()
284            .with_table::<tables::AccountsHistory>()
285            .with_table::<tables::StoragesHistory>()
286    }
287
288    /// Enables metrics.
289    pub const fn with_metrics(mut self) -> Self {
290        self.enable_metrics = true;
291        self
292    }
293
294    /// Enables `RocksDB` internal statistics collection.
295    pub const fn with_statistics(mut self) -> Self {
296        self.enable_statistics = true;
297        self
298    }
299
300    /// Sets the log level from `DatabaseArgs` configuration.
301    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
302        if let Some(level) = log_level {
303            self.log_level = convert_log_level(level);
304        }
305        self
306    }
307
308    /// Sets a custom block cache size.
309    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
310        self.block_cache = Cache::new_lru_cache(capacity_bytes);
311        self
312    }
313
314    /// Sets read-only mode.
315    ///
316    /// Note: Write operations on a read-only provider will panic at runtime.
317    pub const fn with_read_only(mut self, read_only: bool) -> Self {
318        self.read_only = read_only;
319        self
320    }
321
322    /// Builds the [`RocksDBProvider`].
323    pub fn build(self) -> ProviderResult<RocksDBProvider> {
324        let options =
325            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
326
327        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
328            .column_families
329            .iter()
330            .map(|name| {
331                let cf_options = if name == tables::TransactionHashNumbers::NAME {
332                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
333                } else {
334                    Self::default_column_family_options(&self.block_cache)
335                };
336                ColumnFamilyDescriptor::new(name.clone(), cf_options)
337            })
338            .collect();
339
340        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
341
342        if self.read_only {
343            let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
344                .map_err(|e| {
345                    ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
346                        message: e.to_string().into(),
347                        code: -1,
348                    }))
349                })?;
350            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
351        } else {
352            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
353            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
354            // detection at commit) and is backed by DBCommon, giving us access to
355            // cancel_all_background_work for clean shutdown.
356            let db =
357                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
358                    .map_err(|e| {
359                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
360                            message: e.to_string().into(),
361                            code: -1,
362                        }))
363                    })?;
364            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
365        }
366    }
367}
368
369/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
370/// allocated buffer when we can just use their reference.
371macro_rules! compress_to_buf_or_ref {
372    ($buf:expr, $value:expr) => {
373        if let Some(value) = $value.uncompressable_ref() {
374            Some(value)
375        } else {
376            $buf.clear();
377            $value.compress_to_buf(&mut $buf);
378            None
379        }
380    };
381}
382
383/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
384#[derive(Debug)]
385pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
386
387/// Inner state for `RocksDB` provider.
388enum RocksDBProviderInner {
389    /// Read-write mode using `OptimisticTransactionDB`.
390    ReadWrite {
391        /// `RocksDB` database instance with optimistic transaction support.
392        db: OptimisticTransactionDB,
393        /// Metrics latency & operations.
394        metrics: Option<RocksDBMetrics>,
395    },
396    /// Read-only mode using `DB` opened with `open_cf_descriptors_read_only`.
397    /// This doesn't acquire an exclusive lock, allowing concurrent reads.
398    ReadOnly {
399        /// Read-only `RocksDB` database instance.
400        db: DB,
401        /// Metrics latency & operations.
402        metrics: Option<RocksDBMetrics>,
403    },
404}
405
406impl RocksDBProviderInner {
407    /// Returns the metrics for this provider.
408    const fn metrics(&self) -> Option<&RocksDBMetrics> {
409        match self {
410            Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
411        }
412    }
413
414    /// Returns the read-write database, panicking if in read-only mode.
415    fn db_rw(&self) -> &OptimisticTransactionDB {
416        match self {
417            Self::ReadWrite { db, .. } => db,
418            Self::ReadOnly { .. } => {
419                panic!("Cannot perform write operation on read-only RocksDB provider")
420            }
421        }
422    }
423
424    /// Gets the column family handle for a table.
425    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
426        let cf = match self {
427            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
428            Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
429        };
430        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
431    }
432
433    /// Gets a value from a column family.
434    fn get_cf(
435        &self,
436        cf: &rocksdb::ColumnFamily,
437        key: impl AsRef<[u8]>,
438    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
439        match self {
440            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
441            Self::ReadOnly { db, .. } => db.get_cf(cf, key),
442        }
443    }
444
445    /// Puts a value into a column family.
446    fn put_cf(
447        &self,
448        cf: &rocksdb::ColumnFamily,
449        key: impl AsRef<[u8]>,
450        value: impl AsRef<[u8]>,
451    ) -> Result<(), rocksdb::Error> {
452        self.db_rw().put_cf(cf, key, value)
453    }
454
455    /// Deletes a value from a column family.
456    fn delete_cf(
457        &self,
458        cf: &rocksdb::ColumnFamily,
459        key: impl AsRef<[u8]>,
460    ) -> Result<(), rocksdb::Error> {
461        self.db_rw().delete_cf(cf, key)
462    }
463
464    /// Deletes a range of values from a column family.
465    fn delete_range_cf<K: AsRef<[u8]>>(
466        &self,
467        cf: &rocksdb::ColumnFamily,
468        from: K,
469        to: K,
470    ) -> Result<(), rocksdb::Error> {
471        self.db_rw().delete_range_cf(cf, from, to)
472    }
473
474    /// Returns an iterator over a column family.
475    fn iterator_cf(
476        &self,
477        cf: &rocksdb::ColumnFamily,
478        mode: IteratorMode<'_>,
479    ) -> RocksDBIterEnum<'_> {
480        match self {
481            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
482            Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
483        }
484    }
485
486    /// Returns a raw iterator over a column family.
487    ///
488    /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
489    /// repositioning without creating a new iterator.
490    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
491        match self {
492            Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
493            Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
494        }
495    }
496
497    /// Returns a read-only, point-in-time snapshot of the database.
498    fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
499        match self {
500            Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
501            Self::ReadOnly { db, .. } => RocksReadSnapshotInner::ReadOnly(db.snapshot()),
502        }
503    }
504
505    /// Returns the path to the database directory.
506    fn path(&self) -> &Path {
507        match self {
508            Self::ReadWrite { db, .. } => db.path(),
509            Self::ReadOnly { db, .. } => db.path(),
510        }
511    }
512
513    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
514    ///
515    /// WAL files have a `.log` extension in the `RocksDB` directory.
516    fn wal_size_bytes(&self) -> u64 {
517        let path = self.path();
518
519        match std::fs::read_dir(path) {
520            Ok(entries) => entries
521                .filter_map(|e| e.ok())
522                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
523                .filter_map(|e| e.metadata().ok())
524                .map(|m| m.len())
525                .sum(),
526            Err(_) => 0,
527        }
528    }
529
530    /// Returns statistics for all column families in the database.
531    fn table_stats(&self) -> Vec<RocksDBTableStats> {
532        let mut stats = Vec::new();
533
534        macro_rules! collect_stats {
535            ($db:expr) => {
536                for cf_name in ROCKSDB_TABLES {
537                    if let Some(cf) = $db.cf_handle(cf_name) {
538                        let estimated_num_keys = $db
539                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
540                            .ok()
541                            .flatten()
542                            .unwrap_or(0);
543
544                        // SST files size (on-disk) + memtable size (in-memory)
545                        let sst_size = $db
546                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
547                            .ok()
548                            .flatten()
549                            .unwrap_or(0);
550
551                        let memtable_size = $db
552                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
553                            .ok()
554                            .flatten()
555                            .unwrap_or(0);
556
557                        let estimated_size_bytes = sst_size + memtable_size;
558
559                        let pending_compaction_bytes = $db
560                            .property_int_value_cf(
561                                cf,
562                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
563                            )
564                            .ok()
565                            .flatten()
566                            .unwrap_or(0);
567
568                        stats.push(RocksDBTableStats {
569                            sst_size_bytes: sst_size,
570                            memtable_size_bytes: memtable_size,
571                            name: cf_name.to_string(),
572                            estimated_num_keys,
573                            estimated_size_bytes,
574                            pending_compaction_bytes,
575                        });
576                    }
577                }
578            };
579        }
580
581        match self {
582            Self::ReadWrite { db, .. } => collect_stats!(db),
583            Self::ReadOnly { db, .. } => collect_stats!(db),
584        }
585
586        stats
587    }
588
589    /// Returns database-level statistics including per-table stats and WAL size.
590    fn db_stats(&self) -> RocksDBStats {
591        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
592    }
593}
594
595impl fmt::Debug for RocksDBProviderInner {
596    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597        match self {
598            Self::ReadWrite { metrics, .. } => f
599                .debug_struct("RocksDBProviderInner::ReadWrite")
600                .field("db", &"<OptimisticTransactionDB>")
601                .field("metrics", metrics)
602                .finish(),
603            Self::ReadOnly { metrics, .. } => f
604                .debug_struct("RocksDBProviderInner::ReadOnly")
605                .field("db", &"<DB (read-only)>")
606                .field("metrics", metrics)
607                .finish(),
608        }
609    }
610}
611
612impl Drop for RocksDBProviderInner {
613    fn drop(&mut self) {
614        match self {
615            Self::ReadWrite { db, .. } => {
616                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
617                // restart
618                if let Err(e) = db.flush_wal(true) {
619                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
620                }
621                for cf_name in ROCKSDB_TABLES {
622                    if let Some(cf) = db.cf_handle(cf_name) &&
623                        let Err(e) = db.flush_cf(&cf)
624                    {
625                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
626                    }
627                }
628                db.cancel_all_background_work(true);
629            }
630            Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
631        }
632    }
633}
634
635impl Clone for RocksDBProvider {
636    fn clone(&self) -> Self {
637        Self(self.0.clone())
638    }
639}
640
641impl DatabaseMetrics for RocksDBProvider {
642    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
643        let mut metrics = Vec::new();
644
645        for stat in self.table_stats() {
646            metrics.push((
647                "rocksdb.table_size",
648                stat.estimated_size_bytes as f64,
649                vec![Label::new("table", stat.name.clone())],
650            ));
651            metrics.push((
652                "rocksdb.table_entries",
653                stat.estimated_num_keys as f64,
654                vec![Label::new("table", stat.name.clone())],
655            ));
656            metrics.push((
657                "rocksdb.pending_compaction_bytes",
658                stat.pending_compaction_bytes as f64,
659                vec![Label::new("table", stat.name.clone())],
660            ));
661            metrics.push((
662                "rocksdb.sst_size",
663                stat.sst_size_bytes as f64,
664                vec![Label::new("table", stat.name.clone())],
665            ));
666            metrics.push((
667                "rocksdb.memtable_size",
668                stat.memtable_size_bytes as f64,
669                vec![Label::new("table", stat.name)],
670            ));
671        }
672
673        // WAL size (DB-level, shared across all tables)
674        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
675
676        metrics
677    }
678}
679
680impl RocksDBProvider {
681    /// Creates a new `RocksDB` provider.
682    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
683        RocksDBBuilder::new(path).build()
684    }
685
686    /// Creates a new `RocksDB` provider builder.
687    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
688        RocksDBBuilder::new(path)
689    }
690
691    /// Returns `true` if a `RocksDB` database exists at the given path.
692    ///
693    /// Checks for the presence of the `CURRENT` file, which `RocksDB` creates
694    /// when initializing a database.
695    pub fn exists(path: impl AsRef<Path>) -> bool {
696        path.as_ref().join("CURRENT").exists()
697    }
698
699    /// Returns `true` if this provider is in read-only mode.
700    pub fn is_read_only(&self) -> bool {
701        matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
702    }
703
704    /// Returns a read-only, point-in-time snapshot of the database.
705    ///
706    /// Lighter weight than [`RocksTx`] — no write-conflict tracking, and `Send + Sync`.
707    pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
708        RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
709    }
710
711    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
712    ///
713    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
714    /// Conflict detection happens at commit time, not at write time.
715    ///
716    /// # Panics
717    /// Panics if the provider is in read-only mode.
718    pub fn tx(&self) -> RocksTx<'_> {
719        let write_options = WriteOptions::default();
720        let txn_options = OptimisticTransactionOptions::default();
721        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
722        RocksTx { inner, provider: self }
723    }
724
725    /// Creates a new batch for atomic writes.
726    ///
727    /// Use [`Self::write_batch`] for closure-based atomic writes.
728    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
729    ///
730    /// # Panics
731    /// Panics if the provider is in read-only mode when attempting to commit.
732    pub fn batch(&self) -> RocksDBBatch<'_> {
733        RocksDBBatch {
734            provider: self,
735            inner: WriteBatchWithTransaction::<true>::default(),
736            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
737            auto_commit_threshold: None,
738        }
739    }
740
741    /// Creates a new batch with auto-commit enabled.
742    ///
743    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
744    /// committed and reset. This prevents OOM during large bulk writes while maintaining
745    /// crash-safety via the consistency check on startup.
746    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
747        RocksDBBatch {
748            provider: self,
749            inner: WriteBatchWithTransaction::<true>::default(),
750            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
751            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
752        }
753    }
754
755    /// Gets the column family handle for a table.
756    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
757        self.0.cf_handle::<T>()
758    }
759
760    /// Executes a function and records metrics with the given operation and table name.
761    fn execute_with_operation_metric<R>(
762        &self,
763        operation: RocksDBOperation,
764        table: &'static str,
765        f: impl FnOnce(&Self) -> R,
766    ) -> R {
767        let start = self.0.metrics().map(|_| Instant::now());
768        let res = f(self);
769
770        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
771            metrics.record_operation(operation, table, start.elapsed());
772        }
773
774        res
775    }
776
777    /// Gets a value from the specified table.
778    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
779        self.get_encoded::<T>(&key.encode())
780    }
781
782    /// Gets a value from the specified table using pre-encoded key.
783    pub fn get_encoded<T: Table>(
784        &self,
785        key: &<T::Key as Encode>::Encoded,
786    ) -> ProviderResult<Option<T::Value>> {
787        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
788            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
789                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
790                    message: e.to_string().into(),
791                    code: -1,
792                }))
793            })?;
794
795            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
796        })
797    }
798
799    /// Puts upsert a value into the specified table with the given key.
800    ///
801    /// # Panics
802    /// Panics if the provider is in read-only mode.
803    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
804        let encoded_key = key.encode();
805        self.put_encoded::<T>(&encoded_key, value)
806    }
807
808    /// Puts a value into the specified table using pre-encoded key.
809    ///
810    /// # Panics
811    /// Panics if the provider is in read-only mode.
812    pub fn put_encoded<T: Table>(
813        &self,
814        key: &<T::Key as Encode>::Encoded,
815        value: &T::Value,
816    ) -> ProviderResult<()> {
817        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
818            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
819            // is thread safe if user want to avoid allocate buf each time, they can use
820            // write_batch api
821            let mut buf = Vec::new();
822            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
823
824            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
825                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
826                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
827                    operation: DatabaseWriteOperation::PutUpsert,
828                    table_name: T::NAME,
829                    key: key.as_ref().to_vec(),
830                })))
831            })
832        })
833    }
834
835    /// Deletes a value from the specified table.
836    ///
837    /// # Panics
838    /// Panics if the provider is in read-only mode.
839    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
840        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
841            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
842                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
843                    message: e.to_string().into(),
844                    code: -1,
845                }))
846            })
847        })
848    }
849
850    /// Clears all entries from the specified table.
851    ///
852    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
853    /// This end key must exceed the maximum encoded key size for any table.
854    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
855    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
856        let cf = self.get_cf_handle::<T>()?;
857
858        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
859            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
860                message: e.to_string().into(),
861                code: -1,
862            }))
863        })?;
864
865        Ok(())
866    }
867
868    /// Retrieves the first or last entry from a table based on the iterator mode.
869    fn get_boundary<T: Table>(
870        &self,
871        mode: IteratorMode<'_>,
872    ) -> ProviderResult<Option<(T::Key, T::Value)>> {
873        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
874            let cf = this.get_cf_handle::<T>()?;
875            let mut iter = this.0.iterator_cf(cf, mode);
876
877            match iter.next() {
878                Some(Ok((key_bytes, value_bytes))) => {
879                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
880                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
881                    let value = T::Value::decompress(&value_bytes)
882                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
883                    Ok(Some((key, value)))
884                }
885                Some(Err(e)) => {
886                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
887                        message: e.to_string().into(),
888                        code: -1,
889                    })))
890                }
891                None => Ok(None),
892            }
893        })
894    }
895
896    /// Gets the first (smallest key) entry from the specified table.
897    #[inline]
898    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
899        self.get_boundary::<T>(IteratorMode::Start)
900    }
901
902    /// Gets the last (largest key) entry from the specified table.
903    #[inline]
904    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
905        self.get_boundary::<T>(IteratorMode::End)
906    }
907
908    /// Creates an iterator over all entries in the specified table.
909    ///
910    /// Returns decoded `(Key, Value)` pairs in key order.
911    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
912        let cf = self.get_cf_handle::<T>()?;
913        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
914        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
915    }
916
917    /// Returns statistics for all column families in the database.
918    ///
919    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
920    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
921        self.0.table_stats()
922    }
923
924    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
925    ///
926    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
927    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
928    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
929    pub fn wal_size_bytes(&self) -> u64 {
930        self.0.wal_size_bytes()
931    }
932
933    /// Returns database-level statistics including per-table stats and WAL size.
934    ///
935    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
936    pub fn db_stats(&self) -> RocksDBStats {
937        self.0.db_stats()
938    }
939
940    /// Flushes pending writes for the specified tables to disk.
941    ///
942    /// This performs a flush of:
943    /// 1. The column family memtables for the specified table names to SST files
944    /// 2. The Write-Ahead Log (WAL) with sync
945    ///
946    /// After this call completes, all data for the specified tables is durably persisted to disk.
947    ///
948    /// # Panics
949    /// Panics if the provider is in read-only mode.
950    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
951    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
952        let db = self.0.db_rw();
953
954        for cf_name in tables {
955            if let Some(cf) = db.cf_handle(cf_name) {
956                db.flush_cf(&cf).map_err(|e| {
957                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
958                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
959                        operation: DatabaseWriteOperation::Flush,
960                        table_name: cf_name,
961                        key: Vec::new(),
962                    })))
963                })?;
964            }
965        }
966
967        db.flush_wal(true).map_err(|e| {
968            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
969                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
970                operation: DatabaseWriteOperation::Flush,
971                table_name: "WAL",
972                key: Vec::new(),
973            })))
974        })?;
975
976        Ok(())
977    }
978
979    /// Flushes and compacts all tables in `RocksDB`.
980    ///
981    /// This:
982    /// 1. Flushes all column family memtables to SST files
983    /// 2. Flushes the Write-Ahead Log (WAL) with sync
984    /// 3. Triggers manual compaction on all column families to reclaim disk space
985    ///
986    /// Use this after large delete operations (like pruning) to reclaim disk space.
987    ///
988    /// # Panics
989    /// Panics if the provider is in read-only mode.
990    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
991    pub fn flush_and_compact(&self) -> ProviderResult<()> {
992        self.flush(ROCKSDB_TABLES)?;
993
994        let db = self.0.db_rw();
995
996        for cf_name in ROCKSDB_TABLES {
997            if let Some(cf) = db.cf_handle(cf_name) {
998                db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
999            }
1000        }
1001
1002        Ok(())
1003    }
1004
1005    /// Creates a raw iterator over all entries in the specified table.
1006    ///
1007    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
1008    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1009        let cf = self.get_cf_handle::<T>()?;
1010        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1011        Ok(RocksDBRawIter { inner: iter })
1012    }
1013
1014    /// Returns all account history shards for the given address in ascending key order.
1015    ///
1016    /// This is used for unwind operations where we need to scan all shards for an address
1017    /// and potentially delete or truncate them.
1018    pub fn account_history_shards(
1019        &self,
1020        address: Address,
1021    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1022        // Get the column family handle for the AccountsHistory table.
1023        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1024
1025        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
1026        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
1027        let start_key = ShardedKey::new(address, 0u64);
1028        let start_bytes = start_key.encode();
1029
1030        // Create a forward iterator starting from our seek position.
1031        let iter = self
1032            .0
1033            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1034
1035        let mut result = Vec::new();
1036        for item in iter {
1037            match item {
1038                Ok((key_bytes, value_bytes)) => {
1039                    // Decode the sharded key to check if we're still on the same address.
1040                    let key = ShardedKey::<Address>::decode(&key_bytes)
1041                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1042
1043                    // Stop when we reach a different address (keys are sorted by address first).
1044                    if key.key != address {
1045                        break;
1046                    }
1047
1048                    // Decompress the block number list stored in this shard.
1049                    let value = BlockNumberList::decompress(&value_bytes)
1050                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1051
1052                    result.push((key, value));
1053                }
1054                Err(e) => {
1055                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1056                        message: e.to_string().into(),
1057                        code: -1,
1058                    })));
1059                }
1060            }
1061        }
1062
1063        Ok(result)
1064    }
1065
1066    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1067    ///
1068    /// Iterates through all shards in ascending `highest_block_number` order until
1069    /// a different `(address, storage_key)` is encountered.
1070    pub fn storage_history_shards(
1071        &self,
1072        address: Address,
1073        storage_key: B256,
1074    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1075        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1076
1077        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1078        let start_bytes = start_key.encode();
1079
1080        let iter = self
1081            .0
1082            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1083
1084        let mut result = Vec::new();
1085        for item in iter {
1086            match item {
1087                Ok((key_bytes, value_bytes)) => {
1088                    let key = StorageShardedKey::decode(&key_bytes)
1089                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1090
1091                    if key.address != address || key.sharded_key.key != storage_key {
1092                        break;
1093                    }
1094
1095                    let value = BlockNumberList::decompress(&value_bytes)
1096                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1097
1098                    result.push((key, value));
1099                }
1100                Err(e) => {
1101                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1102                        message: e.to_string().into(),
1103                        code: -1,
1104                    })));
1105                }
1106            }
1107        }
1108
1109        Ok(result)
1110    }
1111
1112    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1113    ///
1114    /// Groups addresses by their minimum block number and calls the appropriate unwind
1115    /// operations. For each address, keeps only blocks less than the minimum block
1116    /// (i.e., removes the minimum block and all higher blocks).
1117    ///
1118    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1119    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1120    pub fn unwind_account_history_indices(
1121        &self,
1122        last_indices: &[(Address, BlockNumber)],
1123    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1124        let mut address_min_block: AddressMap<BlockNumber> =
1125            AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1126        for &(address, block_number) in last_indices {
1127            address_min_block
1128                .entry(address)
1129                .and_modify(|min| *min = (*min).min(block_number))
1130                .or_insert(block_number);
1131        }
1132
1133        let mut batch = self.batch();
1134        for (address, min_block) in address_min_block {
1135            match min_block.checked_sub(1) {
1136                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1137                None => batch.clear_account_history(address)?,
1138            }
1139        }
1140
1141        Ok(batch.into_inner())
1142    }
1143
1144    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1145    ///
1146    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1147    /// For each key, keeps only blocks less than the minimum block
1148    /// (i.e., removes the minimum block and all higher blocks).
1149    ///
1150    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1151    pub fn unwind_storage_history_indices(
1152        &self,
1153        storage_changesets: &[(Address, B256, BlockNumber)],
1154    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1155        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1156            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1157        for &(address, storage_key, block_number) in storage_changesets {
1158            key_min_block
1159                .entry((address, storage_key))
1160                .and_modify(|min| *min = (*min).min(block_number))
1161                .or_insert(block_number);
1162        }
1163
1164        let mut batch = self.batch();
1165        for ((address, storage_key), min_block) in key_min_block {
1166            match min_block.checked_sub(1) {
1167                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1168                None => batch.clear_storage_history(address, storage_key)?,
1169            }
1170        }
1171
1172        Ok(batch.into_inner())
1173    }
1174
1175    /// Writes a batch of operations atomically.
1176    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1177    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1178    where
1179        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1180    {
1181        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1182            let mut batch_handle = this.batch();
1183            f(&mut batch_handle)?;
1184            batch_handle.commit()
1185        })
1186    }
1187
1188    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1189    ///
1190    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1191    /// and needs to be committed at a later point (e.g., at provider commit time).
1192    ///
1193    /// # Panics
1194    /// Panics if the provider is in read-only mode.
1195    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1196    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1197        self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1198            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1199                message: e.to_string().into(),
1200                code: -1,
1201            }))
1202        })
1203    }
1204
1205    /// Writes all `RocksDB` data for multiple blocks in parallel.
1206    ///
1207    /// This handles transaction hash numbers, account history, and storage history based on
1208    /// the provided storage settings. Each operation runs in parallel with its own batch,
1209    /// pushing to `ctx.pending_batches` for later commit.
1210    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1211    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1212        &self,
1213        blocks: &[ExecutedBlock<N>],
1214        tx_nums: &[TxNumber],
1215        ctx: RocksDBWriteCtx,
1216        runtime: &reth_tasks::Runtime,
1217    ) -> ProviderResult<()> {
1218        if !ctx.storage_settings.storage_v2 {
1219            return Ok(());
1220        }
1221
1222        let mut r_tx_hash = None;
1223        let mut r_account_history = None;
1224        let mut r_storage_history = None;
1225
1226        let write_tx_hash =
1227            ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1228        let write_account_history = ctx.storage_settings.storage_v2;
1229        let write_storage_history = ctx.storage_settings.storage_v2;
1230
1231        // Propagate tracing context into rayon-spawned threads so that RocksDB
1232        // write spans appear as children of write_blocks_data in traces.
1233        let span = tracing::Span::current();
1234        runtime.storage_pool().in_place_scope(|s| {
1235            if write_tx_hash {
1236                s.spawn(|_| {
1237                    let _guard = span.enter();
1238                    r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1239                });
1240            }
1241
1242            if write_account_history {
1243                s.spawn(|_| {
1244                    let _guard = span.enter();
1245                    r_account_history = Some(self.write_account_history(blocks, &ctx));
1246                });
1247            }
1248
1249            if write_storage_history {
1250                s.spawn(|_| {
1251                    let _guard = span.enter();
1252                    r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1253                });
1254            }
1255        });
1256
1257        if write_tx_hash {
1258            r_tx_hash.ok_or_else(|| {
1259                ProviderError::Database(DatabaseError::Other(
1260                    "rocksdb tx-hash write thread panicked".into(),
1261                ))
1262            })??;
1263        }
1264        if write_account_history {
1265            r_account_history.ok_or_else(|| {
1266                ProviderError::Database(DatabaseError::Other(
1267                    "rocksdb account-history write thread panicked".into(),
1268                ))
1269            })??;
1270        }
1271        if write_storage_history {
1272            r_storage_history.ok_or_else(|| {
1273                ProviderError::Database(DatabaseError::Other(
1274                    "rocksdb storage-history write thread panicked".into(),
1275                ))
1276            })??;
1277        }
1278
1279        Ok(())
1280    }
1281
1282    /// Writes transaction hash to number mappings for the given blocks.
1283    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1284    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1285        &self,
1286        blocks: &[ExecutedBlock<N>],
1287        tx_nums: &[TxNumber],
1288        ctx: &RocksDBWriteCtx,
1289    ) -> ProviderResult<()> {
1290        let mut batch = self.batch();
1291        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1292            let body = block.recovered_block().body();
1293            for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1294                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1295            }
1296        }
1297        ctx.pending_batches.lock().push(batch.into_inner());
1298        Ok(())
1299    }
1300
1301    /// Writes account history indices for the given blocks.
1302    ///
1303    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1304    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1305    fn write_account_history<N: reth_node_types::NodePrimitives>(
1306        &self,
1307        blocks: &[ExecutedBlock<N>],
1308        ctx: &RocksDBWriteCtx,
1309    ) -> ProviderResult<()> {
1310        let mut batch = self.batch();
1311        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1312
1313        for (block_idx, block) in blocks.iter().enumerate() {
1314            let block_number = ctx.first_block_number + block_idx as u64;
1315            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1316
1317            // Iterate through account reverts - these are exactly the accounts that have
1318            // changesets written, ensuring history indices match changeset entries.
1319            for account_block_reverts in reverts.accounts {
1320                for (address, _) in account_block_reverts {
1321                    account_history.entry(address).or_default().push(block_number);
1322                }
1323            }
1324        }
1325
1326        // Write account history using proper shard append logic
1327        for (address, indices) in account_history {
1328            batch.append_account_history_shard(address, indices)?;
1329        }
1330        ctx.pending_batches.lock().push(batch.into_inner());
1331        Ok(())
1332    }
1333
1334    /// Writes storage history indices for the given blocks.
1335    ///
1336    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1337    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1338    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1339        &self,
1340        blocks: &[ExecutedBlock<N>],
1341        ctx: &RocksDBWriteCtx,
1342    ) -> ProviderResult<()> {
1343        let mut batch = self.batch();
1344        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1345
1346        for (block_idx, block) in blocks.iter().enumerate() {
1347            let block_number = ctx.first_block_number + block_idx as u64;
1348            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1349
1350            // Iterate through storage reverts - these are exactly the slots that have
1351            // changesets written, ensuring history indices match changeset entries.
1352            for storage_block_reverts in reverts.storage {
1353                for revert in storage_block_reverts {
1354                    for (slot, _) in revert.storage_revert {
1355                        let plain_key = B256::new(slot.to_be_bytes());
1356                        storage_history
1357                            .entry((revert.address, plain_key))
1358                            .or_default()
1359                            .push(block_number);
1360                    }
1361                }
1362            }
1363        }
1364
1365        // Write storage history using proper shard append logic
1366        for ((address, slot), indices) in storage_history {
1367            batch.append_storage_history_shard(address, slot, indices)?;
1368        }
1369        ctx.pending_batches.lock().push(batch.into_inner());
1370        Ok(())
1371    }
1372}
1373
1374/// A point-in-time read snapshot of the `RocksDB` database.
1375///
1376/// All reads through this snapshot see a consistent view of the database at the point
1377/// the snapshot was created, regardless of concurrent writes. This is the primary reader
1378/// used by [`EitherReader::RocksDB`](crate::either_writer::EitherReader) for history lookups.
1379///
1380/// Lighter weight than [`RocksTx`] — no transaction overhead, no write support.
1381pub struct RocksReadSnapshot<'db> {
1382    inner: RocksReadSnapshotInner<'db>,
1383    provider: &'db RocksDBProvider,
1384}
1385
1386/// Inner enum to hold the snapshot for either read-write or read-only mode.
1387enum RocksReadSnapshotInner<'db> {
1388    /// Snapshot from read-write `OptimisticTransactionDB`.
1389    ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1390    /// Snapshot from read-only `DB`.
1391    ReadOnly(SnapshotWithThreadMode<'db, DB>),
1392}
1393
1394impl<'db> RocksReadSnapshotInner<'db> {
1395    /// Returns a raw iterator over a column family.
1396    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1397        match self {
1398            Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1399            Self::ReadOnly(snap) => RocksDBRawIterEnum::ReadOnly(snap.raw_iterator_cf(cf)),
1400        }
1401    }
1402}
1403
1404impl fmt::Debug for RocksReadSnapshot<'_> {
1405    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1406        f.debug_struct("RocksReadSnapshot")
1407            .field("provider", &self.provider)
1408            .finish_non_exhaustive()
1409    }
1410}
1411
1412impl<'db> RocksReadSnapshot<'db> {
1413    /// Gets the column family handle for a table.
1414    fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1415        self.provider.get_cf_handle::<T>()
1416    }
1417
1418    /// Gets a value from the specified table.
1419    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1420        let encoded_key = key.encode();
1421        let cf = self.cf_handle::<T>()?;
1422        let result = match &self.inner {
1423            RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1424            RocksReadSnapshotInner::ReadOnly(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1425        }
1426        .map_err(|e| {
1427            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1428                message: e.to_string().into(),
1429                code: -1,
1430            }))
1431        })?;
1432
1433        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1434    }
1435
1436    /// Lookup account history and return [`HistoryInfo`] directly.
1437    pub fn account_history_info(
1438        &self,
1439        address: Address,
1440        block_number: BlockNumber,
1441        lowest_available_block_number: Option<BlockNumber>,
1442    ) -> ProviderResult<HistoryInfo> {
1443        let key = ShardedKey::new(address, block_number);
1444        self.history_info::<tables::AccountsHistory>(
1445            key.encode().as_ref(),
1446            block_number,
1447            lowest_available_block_number,
1448            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1449            |prev_bytes| {
1450                <ShardedKey<Address> as Decode>::decode(prev_bytes)
1451                    .map(|k| k.key == address)
1452                    .unwrap_or(false)
1453            },
1454        )
1455    }
1456
1457    /// Lookup storage history and return [`HistoryInfo`] directly.
1458    pub fn storage_history_info(
1459        &self,
1460        address: Address,
1461        storage_key: B256,
1462        block_number: BlockNumber,
1463        lowest_available_block_number: Option<BlockNumber>,
1464    ) -> ProviderResult<HistoryInfo> {
1465        let key = StorageShardedKey::new(address, storage_key, block_number);
1466        self.history_info::<tables::StoragesHistory>(
1467            key.encode().as_ref(),
1468            block_number,
1469            lowest_available_block_number,
1470            |key_bytes| {
1471                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1472                Ok(k.address == address && k.sharded_key.key == storage_key)
1473            },
1474            |prev_bytes| {
1475                <StorageShardedKey as Decode>::decode(prev_bytes)
1476                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
1477                    .unwrap_or(false)
1478            },
1479        )
1480    }
1481
1482    /// Generic history lookup using the snapshot's raw iterator.
1483    fn history_info<T>(
1484        &self,
1485        encoded_key: &[u8],
1486        block_number: BlockNumber,
1487        lowest_available_block_number: Option<BlockNumber>,
1488        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1489        prev_key_matches: impl Fn(&[u8]) -> bool,
1490    ) -> ProviderResult<HistoryInfo>
1491    where
1492        T: Table<Value = BlockNumberList>,
1493    {
1494        let is_maybe_pruned = lowest_available_block_number.is_some();
1495        let fallback = || {
1496            Ok(if is_maybe_pruned {
1497                HistoryInfo::MaybeInPlainState
1498            } else {
1499                HistoryInfo::NotYetWritten
1500            })
1501        };
1502
1503        let cf = self.cf_handle::<T>()?;
1504        let mut iter = self.inner.raw_iterator_cf(cf);
1505
1506        iter.seek(encoded_key);
1507        iter.status().map_err(|e| {
1508            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1509                message: e.to_string().into(),
1510                code: -1,
1511            }))
1512        })?;
1513
1514        if !iter.valid() {
1515            return fallback();
1516        }
1517
1518        let Some(key_bytes) = iter.key() else {
1519            return fallback();
1520        };
1521        if !key_matches(key_bytes)? {
1522            return fallback();
1523        }
1524
1525        let Some(value_bytes) = iter.value() else {
1526            return fallback();
1527        };
1528        let chunk = BlockNumberList::decompress(value_bytes)?;
1529        let (rank, found_block) = compute_history_rank(&chunk, block_number);
1530
1531        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1532            iter.prev();
1533            iter.status().map_err(|e| {
1534                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1535                    message: e.to_string().into(),
1536                    code: -1,
1537                }))
1538            })?;
1539            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1540            !has_prev
1541        } else {
1542            false
1543        };
1544
1545        Ok(HistoryInfo::from_lookup(
1546            found_block,
1547            is_before_first_write,
1548            lowest_available_block_number,
1549        ))
1550    }
1551}
1552
1553/// Outcome of pruning a history shard in `RocksDB`.
1554#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1555pub enum PruneShardOutcome {
1556    /// Shard was deleted entirely.
1557    Deleted,
1558    /// Shard was updated with filtered block numbers.
1559    Updated,
1560    /// Shard was unchanged (no blocks <= `to_block`).
1561    Unchanged,
1562}
1563
1564/// Tracks pruning outcomes for batch operations.
1565#[derive(Debug, Default, Clone, Copy)]
1566pub struct PrunedIndices {
1567    /// Number of shards completely deleted.
1568    pub deleted: usize,
1569    /// Number of shards that were updated (filtered but still have entries).
1570    pub updated: usize,
1571    /// Number of shards that were unchanged.
1572    pub unchanged: usize,
1573}
1574
1575/// Handle for building a batch of operations atomically.
1576///
1577/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1578/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1579/// where you don't need to read back uncommitted data within the same operation
1580/// (e.g., history index writes).
1581///
1582/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1583/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1584#[must_use = "batch must be committed"]
1585pub struct RocksDBBatch<'a> {
1586    provider: &'a RocksDBProvider,
1587    inner: WriteBatchWithTransaction<true>,
1588    buf: Vec<u8>,
1589    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1590    auto_commit_threshold: Option<usize>,
1591}
1592
1593impl fmt::Debug for RocksDBBatch<'_> {
1594    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1595        f.debug_struct("RocksDBBatch")
1596            .field("provider", &self.provider)
1597            .field("batch", &"<WriteBatchWithTransaction>")
1598            // Number of operations in this batch
1599            .field("length", &self.inner.len())
1600            // Total serialized size (encoded key + compressed value + metadata) of this batch
1601            // in bytes
1602            .field("size_in_bytes", &self.inner.size_in_bytes())
1603            .finish()
1604    }
1605}
1606
1607impl<'a> RocksDBBatch<'a> {
1608    /// Puts a value into the batch.
1609    ///
1610    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1611    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1612        let encoded_key = key.encode();
1613        self.put_encoded::<T>(&encoded_key, value)
1614    }
1615
1616    /// Puts a value into the batch using pre-encoded key.
1617    ///
1618    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1619    pub fn put_encoded<T: Table>(
1620        &mut self,
1621        key: &<T::Key as Encode>::Encoded,
1622        value: &T::Value,
1623    ) -> ProviderResult<()> {
1624        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1625        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1626        self.maybe_auto_commit()?;
1627        Ok(())
1628    }
1629
1630    /// Deletes a value from the batch.
1631    ///
1632    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1633    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1634        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1635        self.maybe_auto_commit()?;
1636        Ok(())
1637    }
1638
1639    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1640    ///
1641    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1642    /// Returns immediately if auto-commit is disabled or threshold not reached.
1643    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1644        if let Some(threshold) = self.auto_commit_threshold &&
1645            self.inner.size_in_bytes() >= threshold
1646        {
1647            tracing::debug!(
1648                target: "providers::rocksdb",
1649                batch_size = self.inner.size_in_bytes(),
1650                threshold,
1651                "Auto-committing RocksDB batch"
1652            );
1653            let old_batch = std::mem::take(&mut self.inner);
1654            self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1655                |e| {
1656                    ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1657                        message: e.to_string().into(),
1658                        code: -1,
1659                    }))
1660                },
1661            )?;
1662        }
1663        Ok(())
1664    }
1665
1666    /// Commits the batch to the database.
1667    ///
1668    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1669    ///
1670    /// # Panics
1671    /// Panics if the provider is in read-only mode.
1672    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1673    pub fn commit(self) -> ProviderResult<()> {
1674        self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1675            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1676                message: e.to_string().into(),
1677                code: -1,
1678            }))
1679        })
1680    }
1681
1682    /// Returns the number of write operations (puts + deletes) queued in this batch.
1683    pub fn len(&self) -> usize {
1684        self.inner.len()
1685    }
1686
1687    /// Returns `true` if the batch contains no operations.
1688    pub fn is_empty(&self) -> bool {
1689        self.inner.is_empty()
1690    }
1691
1692    /// Returns the size of the batch in bytes.
1693    pub fn size_in_bytes(&self) -> usize {
1694        self.inner.size_in_bytes()
1695    }
1696
1697    /// Returns a reference to the underlying `RocksDB` provider.
1698    pub const fn provider(&self) -> &RocksDBProvider {
1699        self.provider
1700    }
1701
1702    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1703    ///
1704    /// This is used to defer commits to the provider level.
1705    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1706        self.inner
1707    }
1708
1709    /// Gets a value from the database.
1710    ///
1711    /// **Important constraint:** This reads only committed state, not pending writes in this
1712    /// batch or other pending batches in `pending_rocksdb_batches`.
1713    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1714        self.provider.get::<T>(key)
1715    }
1716
1717    /// Appends indices to an account history shard with proper shard management.
1718    ///
1719    /// Loads the existing shard (if any), appends new indices, and rechunks into
1720    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1721    ///
1722    /// # Requirements
1723    ///
1724    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1725    /// - This method MUST only be called once per address per batch. The batch reads existing
1726    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1727    ///   address will cause the second call to overwrite the first.
1728    pub fn append_account_history_shard(
1729        &mut self,
1730        address: Address,
1731        indices: impl IntoIterator<Item = u64>,
1732    ) -> ProviderResult<()> {
1733        let indices: Vec<u64> = indices.into_iter().collect();
1734
1735        if indices.is_empty() {
1736            return Ok(());
1737        }
1738
1739        debug_assert!(
1740            indices.windows(2).all(|w| w[0] < w[1]),
1741            "indices must be strictly increasing: {:?}",
1742            indices
1743        );
1744
1745        let last_key = ShardedKey::new(address, u64::MAX);
1746        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1747        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1748
1749        last_shard.append(indices).map_err(ProviderError::other)?;
1750
1751        // Fast path: all indices fit in one shard
1752        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1753            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1754            return Ok(());
1755        }
1756
1757        // Slow path: rechunk into multiple shards
1758        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1759        let mut chunks_peekable = chunks.into_iter().peekable();
1760
1761        while let Some(chunk) = chunks_peekable.next() {
1762            let shard = BlockNumberList::new_pre_sorted(chunk);
1763            let highest_block_number = if chunks_peekable.peek().is_some() {
1764                shard.iter().next_back().expect("`chunks` does not return empty list")
1765            } else {
1766                u64::MAX
1767            };
1768
1769            self.put::<tables::AccountsHistory>(
1770                ShardedKey::new(address, highest_block_number),
1771                &shard,
1772            )?;
1773        }
1774
1775        Ok(())
1776    }
1777
1778    /// Appends indices to a storage history shard with proper shard management.
1779    ///
1780    /// Loads the existing shard (if any), appends new indices, and rechunks into
1781    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1782    ///
1783    /// # Requirements
1784    ///
1785    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1786    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1787    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1788    ///   twice for the same key will cause the second call to overwrite the first.
1789    pub fn append_storage_history_shard(
1790        &mut self,
1791        address: Address,
1792        storage_key: B256,
1793        indices: impl IntoIterator<Item = u64>,
1794    ) -> ProviderResult<()> {
1795        let indices: Vec<u64> = indices.into_iter().collect();
1796
1797        if indices.is_empty() {
1798            return Ok(());
1799        }
1800
1801        debug_assert!(
1802            indices.windows(2).all(|w| w[0] < w[1]),
1803            "indices must be strictly increasing: {:?}",
1804            indices
1805        );
1806
1807        let last_key = StorageShardedKey::last(address, storage_key);
1808        let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1809        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1810
1811        last_shard.append(indices).map_err(ProviderError::other)?;
1812
1813        // Fast path: all indices fit in one shard
1814        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1815            self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1816            return Ok(());
1817        }
1818
1819        // Slow path: rechunk into multiple shards
1820        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1821        let mut chunks_peekable = chunks.into_iter().peekable();
1822
1823        while let Some(chunk) = chunks_peekable.next() {
1824            let shard = BlockNumberList::new_pre_sorted(chunk);
1825            let highest_block_number = if chunks_peekable.peek().is_some() {
1826                shard.iter().next_back().expect("`chunks` does not return empty list")
1827            } else {
1828                u64::MAX
1829            };
1830
1831            self.put::<tables::StoragesHistory>(
1832                StorageShardedKey::new(address, storage_key, highest_block_number),
1833                &shard,
1834            )?;
1835        }
1836
1837        Ok(())
1838    }
1839
1840    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1841    ///
1842    /// Mirrors MDBX `unwind_history_shards` behavior:
1843    /// - Deletes shards entirely above `keep_to`
1844    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1845    /// - Preserves shards entirely below `keep_to`
1846    pub fn unwind_account_history_to(
1847        &mut self,
1848        address: Address,
1849        keep_to: BlockNumber,
1850    ) -> ProviderResult<()> {
1851        let shards = self.provider.account_history_shards(address)?;
1852        if shards.is_empty() {
1853            return Ok(());
1854        }
1855
1856        // Find the first shard that might contain blocks > keep_to.
1857        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1858        let boundary_idx = shards.iter().position(|(key, _)| {
1859            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1860        });
1861
1862        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1863        let Some(boundary_idx) = boundary_idx else {
1864            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1865            if last_key.highest_block_number != u64::MAX {
1866                self.delete::<tables::AccountsHistory>(last_key.clone())?;
1867                self.put::<tables::AccountsHistory>(
1868                    ShardedKey::new(address, u64::MAX),
1869                    last_value,
1870                )?;
1871            }
1872            return Ok(());
1873        };
1874
1875        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1876        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1877            self.delete::<tables::AccountsHistory>(key.clone())?;
1878        }
1879
1880        // Process the boundary shard: filter out blocks > keep_to
1881        let (boundary_key, boundary_list) = &shards[boundary_idx];
1882
1883        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1884        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1885
1886        // Build truncated list once; check emptiness directly (avoids double iteration)
1887        let new_last =
1888            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1889
1890        if new_last.is_empty() {
1891            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
1892            // u64::MAX.
1893            if boundary_idx == 0 {
1894                // Nothing left for this address
1895                return Ok(());
1896            }
1897
1898            let (prev_key, prev_value) = &shards[boundary_idx - 1];
1899            if prev_key.highest_block_number != u64::MAX {
1900                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1901                self.put::<tables::AccountsHistory>(
1902                    ShardedKey::new(address, u64::MAX),
1903                    prev_value,
1904                )?;
1905            }
1906            return Ok(());
1907        }
1908
1909        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1910
1911        Ok(())
1912    }
1913
1914    /// Prunes history shards, removing blocks <= `to_block`.
1915    ///
1916    /// Generic implementation for both account and storage history pruning.
1917    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1918    /// (if any) will have the sentinel key (`u64::MAX`).
1919    #[allow(clippy::too_many_arguments)]
1920    fn prune_history_shards_inner<K>(
1921        &mut self,
1922        shards: Vec<(K, BlockNumberList)>,
1923        to_block: BlockNumber,
1924        get_highest: impl Fn(&K) -> u64,
1925        is_sentinel: impl Fn(&K) -> bool,
1926        delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
1927        put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
1928        create_sentinel: impl Fn() -> K,
1929    ) -> ProviderResult<PruneShardOutcome>
1930    where
1931        K: Clone,
1932    {
1933        if shards.is_empty() {
1934            return Ok(PruneShardOutcome::Unchanged);
1935        }
1936
1937        let mut deleted = false;
1938        let mut updated = false;
1939        let mut last_remaining: Option<(K, BlockNumberList)> = None;
1940
1941        for (key, block_list) in shards {
1942            if !is_sentinel(&key) && get_highest(&key) <= to_block {
1943                delete_shard(self, key)?;
1944                deleted = true;
1945            } else {
1946                let original_len = block_list.len();
1947                let filtered =
1948                    BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
1949
1950                if filtered.is_empty() {
1951                    delete_shard(self, key)?;
1952                    deleted = true;
1953                } else if filtered.len() < original_len {
1954                    put_shard(self, key.clone(), &filtered)?;
1955                    last_remaining = Some((key, filtered));
1956                    updated = true;
1957                } else {
1958                    last_remaining = Some((key, block_list));
1959                }
1960            }
1961        }
1962
1963        if let Some((last_key, last_value)) = last_remaining &&
1964            !is_sentinel(&last_key)
1965        {
1966            delete_shard(self, last_key)?;
1967            put_shard(self, create_sentinel(), &last_value)?;
1968            updated = true;
1969        }
1970
1971        if deleted {
1972            Ok(PruneShardOutcome::Deleted)
1973        } else if updated {
1974            Ok(PruneShardOutcome::Updated)
1975        } else {
1976            Ok(PruneShardOutcome::Unchanged)
1977        }
1978    }
1979
1980    /// Prunes account history for the given address, removing blocks <= `to_block`.
1981    ///
1982    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1983    /// (if any) will have the sentinel key (`u64::MAX`).
1984    pub fn prune_account_history_to(
1985        &mut self,
1986        address: Address,
1987        to_block: BlockNumber,
1988    ) -> ProviderResult<PruneShardOutcome> {
1989        let shards = self.provider.account_history_shards(address)?;
1990        self.prune_history_shards_inner(
1991            shards,
1992            to_block,
1993            |key| key.highest_block_number,
1994            |key| key.highest_block_number == u64::MAX,
1995            |batch, key| batch.delete::<tables::AccountsHistory>(key),
1996            |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1997            || ShardedKey::new(address, u64::MAX),
1998        )
1999    }
2000
2001    /// Prunes account history for multiple addresses in a single iterator pass.
2002    ///
2003    /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
2004    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2005    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2006    ///
2007    /// `targets` MUST be sorted by address for correctness and optimal performance
2008    /// (matches on-disk key order).
2009    pub fn prune_account_history_batch(
2010        &mut self,
2011        targets: &[(Address, BlockNumber)],
2012    ) -> ProviderResult<PrunedIndices> {
2013        if targets.is_empty() {
2014            return Ok(PrunedIndices::default());
2015        }
2016
2017        debug_assert!(
2018            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2019            "prune_account_history_batch: targets must be sorted by address"
2020        );
2021
2022        // ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
2023        // The first 20 bytes are the "prefix" that identifies the address
2024        const PREFIX_LEN: usize = 20;
2025
2026        let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2027        let mut iter = self.provider.0.raw_iterator_cf(cf);
2028        let mut outcomes = PrunedIndices::default();
2029
2030        for (address, to_block) in targets {
2031            // Build the target prefix (first 20 bytes = address)
2032            let start_key = ShardedKey::new(*address, 0u64).encode();
2033            let target_prefix = &start_key[..PREFIX_LEN];
2034
2035            // Check if we need to seek or if the iterator is already positioned correctly.
2036            // After processing the previous target, the iterator is either:
2037            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2038            // 2. Invalid (no more keys)
2039            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2040            let needs_seek = if iter.valid() {
2041                if let Some(current_key) = iter.key() {
2042                    // If current key's prefix < target prefix, we need to seek forward
2043                    // If current key's prefix > target prefix, this target has no shards (skip)
2044                    // If current key's prefix == target prefix, we're already positioned
2045                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2046                } else {
2047                    true
2048                }
2049            } else {
2050                true
2051            };
2052
2053            if needs_seek {
2054                iter.seek(start_key);
2055                iter.status().map_err(|e| {
2056                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2057                        message: e.to_string().into(),
2058                        code: -1,
2059                    }))
2060                })?;
2061            }
2062
2063            // Collect all shards for this address using raw prefix comparison
2064            let mut shards = Vec::new();
2065            while iter.valid() {
2066                let Some(key_bytes) = iter.key() else { break };
2067
2068                // Use raw prefix comparison instead of full decode for the prefix check
2069                let current_prefix = key_bytes.get(..PREFIX_LEN);
2070                if current_prefix != Some(target_prefix) {
2071                    break;
2072                }
2073
2074                // Now decode the full key (we need the block number)
2075                let key = ShardedKey::<Address>::decode(key_bytes)
2076                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2077
2078                let Some(value_bytes) = iter.value() else { break };
2079                let value = BlockNumberList::decompress(value_bytes)
2080                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2081
2082                shards.push((key, value));
2083                iter.next();
2084            }
2085
2086            match self.prune_history_shards_inner(
2087                shards,
2088                *to_block,
2089                |key| key.highest_block_number,
2090                |key| key.highest_block_number == u64::MAX,
2091                |batch, key| batch.delete::<tables::AccountsHistory>(key),
2092                |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2093                || ShardedKey::new(*address, u64::MAX),
2094            )? {
2095                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2096                PruneShardOutcome::Updated => outcomes.updated += 1,
2097                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2098            }
2099        }
2100
2101        Ok(outcomes)
2102    }
2103
2104    /// Prunes storage history for the given address and storage key, removing blocks <=
2105    /// `to_block`.
2106    ///
2107    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2108    /// (if any) will have the sentinel key (`u64::MAX`).
2109    pub fn prune_storage_history_to(
2110        &mut self,
2111        address: Address,
2112        storage_key: B256,
2113        to_block: BlockNumber,
2114    ) -> ProviderResult<PruneShardOutcome> {
2115        let shards = self.provider.storage_history_shards(address, storage_key)?;
2116        self.prune_history_shards_inner(
2117            shards,
2118            to_block,
2119            |key| key.sharded_key.highest_block_number,
2120            |key| key.sharded_key.highest_block_number == u64::MAX,
2121            |batch, key| batch.delete::<tables::StoragesHistory>(key),
2122            |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2123            || StorageShardedKey::last(address, storage_key),
2124        )
2125    }
2126
2127    /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
2128    /// pass.
2129    ///
2130    /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
2131    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2132    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2133    ///
2134    /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
2135    /// performance (matches on-disk key order).
2136    pub fn prune_storage_history_batch(
2137        &mut self,
2138        targets: &[((Address, B256), BlockNumber)],
2139    ) -> ProviderResult<PrunedIndices> {
2140        if targets.is_empty() {
2141            return Ok(PrunedIndices::default());
2142        }
2143
2144        debug_assert!(
2145            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2146            "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2147        );
2148
2149        // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
2150        // The first 52 bytes are the "prefix" that identifies (address, storage_key)
2151        const PREFIX_LEN: usize = 52;
2152
2153        let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2154        let mut iter = self.provider.0.raw_iterator_cf(cf);
2155        let mut outcomes = PrunedIndices::default();
2156
2157        for ((address, storage_key), to_block) in targets {
2158            // Build the target prefix (first 52 bytes of encoded key)
2159            let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2160            let target_prefix = &start_key[..PREFIX_LEN];
2161
2162            // Check if we need to seek or if the iterator is already positioned correctly.
2163            // After processing the previous target, the iterator is either:
2164            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2165            // 2. Invalid (no more keys)
2166            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2167            let needs_seek = if iter.valid() {
2168                if let Some(current_key) = iter.key() {
2169                    // If current key's prefix < target prefix, we need to seek forward
2170                    // If current key's prefix > target prefix, this target has no shards (skip)
2171                    // If current key's prefix == target prefix, we're already positioned
2172                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2173                } else {
2174                    true
2175                }
2176            } else {
2177                true
2178            };
2179
2180            if needs_seek {
2181                iter.seek(start_key);
2182                iter.status().map_err(|e| {
2183                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2184                        message: e.to_string().into(),
2185                        code: -1,
2186                    }))
2187                })?;
2188            }
2189
2190            // Collect all shards for this (address, storage_key) pair using prefix comparison
2191            let mut shards = Vec::new();
2192            while iter.valid() {
2193                let Some(key_bytes) = iter.key() else { break };
2194
2195                // Use raw prefix comparison instead of full decode for the prefix check
2196                let current_prefix = key_bytes.get(..PREFIX_LEN);
2197                if current_prefix != Some(target_prefix) {
2198                    break;
2199                }
2200
2201                // Now decode the full key (we need the block number)
2202                let key = StorageShardedKey::decode(key_bytes)
2203                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2204
2205                let Some(value_bytes) = iter.value() else { break };
2206                let value = BlockNumberList::decompress(value_bytes)
2207                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2208
2209                shards.push((key, value));
2210                iter.next();
2211            }
2212
2213            // Use existing prune_history_shards_inner logic
2214            match self.prune_history_shards_inner(
2215                shards,
2216                *to_block,
2217                |key| key.sharded_key.highest_block_number,
2218                |key| key.sharded_key.highest_block_number == u64::MAX,
2219                |batch, key| batch.delete::<tables::StoragesHistory>(key),
2220                |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2221                || StorageShardedKey::last(*address, *storage_key),
2222            )? {
2223                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2224                PruneShardOutcome::Updated => outcomes.updated += 1,
2225                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2226            }
2227        }
2228
2229        Ok(outcomes)
2230    }
2231
2232    /// Unwinds storage history to keep only blocks `<= keep_to`.
2233    ///
2234    /// Handles multi-shard scenarios by:
2235    /// 1. Loading all shards for the `(address, storage_key)` pair
2236    /// 2. Finding the boundary shard containing `keep_to`
2237    /// 3. Deleting all shards after the boundary
2238    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
2239    /// 5. Ensuring the last shard is keyed with `u64::MAX`
2240    pub fn unwind_storage_history_to(
2241        &mut self,
2242        address: Address,
2243        storage_key: B256,
2244        keep_to: BlockNumber,
2245    ) -> ProviderResult<()> {
2246        let shards = self.provider.storage_history_shards(address, storage_key)?;
2247        if shards.is_empty() {
2248            return Ok(());
2249        }
2250
2251        // Find the first shard that might contain blocks > keep_to.
2252        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
2253        let boundary_idx = shards.iter().position(|(key, _)| {
2254            key.sharded_key.highest_block_number == u64::MAX ||
2255                key.sharded_key.highest_block_number > keep_to
2256        });
2257
2258        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2259        let Some(boundary_idx) = boundary_idx else {
2260            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2261            if last_key.sharded_key.highest_block_number != u64::MAX {
2262                self.delete::<tables::StoragesHistory>(last_key.clone())?;
2263                self.put::<tables::StoragesHistory>(
2264                    StorageShardedKey::last(address, storage_key),
2265                    last_value,
2266                )?;
2267            }
2268            return Ok(());
2269        };
2270
2271        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2272        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2273            self.delete::<tables::StoragesHistory>(key.clone())?;
2274        }
2275
2276        // Process the boundary shard: filter out blocks > keep_to
2277        let (boundary_key, boundary_list) = &shards[boundary_idx];
2278
2279        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2280        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2281
2282        // Build truncated list once; check emptiness directly (avoids double iteration)
2283        let new_last =
2284            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2285
2286        if new_last.is_empty() {
2287            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2288            // u64::MAX.
2289            if boundary_idx == 0 {
2290                // Nothing left for this (address, storage_key) pair
2291                return Ok(());
2292            }
2293
2294            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2295            if prev_key.sharded_key.highest_block_number != u64::MAX {
2296                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2297                self.put::<tables::StoragesHistory>(
2298                    StorageShardedKey::last(address, storage_key),
2299                    prev_value,
2300                )?;
2301            }
2302            return Ok(());
2303        }
2304
2305        self.put::<tables::StoragesHistory>(
2306            StorageShardedKey::last(address, storage_key),
2307            &new_last,
2308        )?;
2309
2310        Ok(())
2311    }
2312
2313    /// Clears all account history shards for the given address.
2314    ///
2315    /// Used when unwinding from block 0 (i.e., removing all history).
2316    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2317        let shards = self.provider.account_history_shards(address)?;
2318        for (key, _) in shards {
2319            self.delete::<tables::AccountsHistory>(key)?;
2320        }
2321        Ok(())
2322    }
2323
2324    /// Clears all storage history shards for the given `(address, storage_key)` pair.
2325    ///
2326    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
2327    pub fn clear_storage_history(
2328        &mut self,
2329        address: Address,
2330        storage_key: B256,
2331    ) -> ProviderResult<()> {
2332        let shards = self.provider.storage_history_shards(address, storage_key)?;
2333        for (key, _) in shards {
2334            self.delete::<tables::StoragesHistory>(key)?;
2335        }
2336        Ok(())
2337    }
2338}
2339
2340/// `RocksDB` transaction wrapper providing MDBX-like semantics.
2341///
2342/// Supports:
2343/// - Read-your-writes: reads see uncommitted writes within the same transaction
2344/// - Atomic commit/rollback
2345/// - Iteration over uncommitted data
2346///
2347/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
2348/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
2349pub struct RocksTx<'db> {
2350    inner: Transaction<'db, OptimisticTransactionDB>,
2351    provider: &'db RocksDBProvider,
2352}
2353
2354impl fmt::Debug for RocksTx<'_> {
2355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2356        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2357    }
2358}
2359
2360impl<'db> RocksTx<'db> {
2361    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
2362    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2363        let encoded_key = key.encode();
2364        self.get_encoded::<T>(&encoded_key)
2365    }
2366
2367    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
2368    pub fn get_encoded<T: Table>(
2369        &self,
2370        key: &<T::Key as Encode>::Encoded,
2371    ) -> ProviderResult<Option<T::Value>> {
2372        let cf = self.provider.get_cf_handle::<T>()?;
2373        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2374            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2375                message: e.to_string().into(),
2376                code: -1,
2377            }))
2378        })?;
2379
2380        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2381    }
2382
2383    /// Puts a value into the specified table.
2384    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2385        let encoded_key = key.encode();
2386        self.put_encoded::<T>(&encoded_key, value)
2387    }
2388
2389    /// Puts a value using pre-encoded key.
2390    pub fn put_encoded<T: Table>(
2391        &self,
2392        key: &<T::Key as Encode>::Encoded,
2393        value: &T::Value,
2394    ) -> ProviderResult<()> {
2395        let cf = self.provider.get_cf_handle::<T>()?;
2396        let mut buf = Vec::new();
2397        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2398
2399        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2400            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2401                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2402                operation: DatabaseWriteOperation::PutUpsert,
2403                table_name: T::NAME,
2404                key: key.as_ref().to_vec(),
2405            })))
2406        })
2407    }
2408
2409    /// Deletes a value from the specified table.
2410    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2411        let cf = self.provider.get_cf_handle::<T>()?;
2412        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2413            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2414                message: e.to_string().into(),
2415                code: -1,
2416            }))
2417        })
2418    }
2419
2420    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
2421    ///
2422    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
2423    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2424        let cf = self.provider.get_cf_handle::<T>()?;
2425        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2426        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2427    }
2428
2429    /// Creates an iterator starting from the given key (inclusive).
2430    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2431        let cf = self.provider.get_cf_handle::<T>()?;
2432        let encoded_key = key.encode();
2433        let iter = self
2434            .inner
2435            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2436        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2437    }
2438
2439    /// Commits the transaction, persisting all changes.
2440    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2441    pub fn commit(self) -> ProviderResult<()> {
2442        self.inner.commit().map_err(|e| {
2443            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2444                message: e.to_string().into(),
2445                code: -1,
2446            }))
2447        })
2448    }
2449
2450    /// Rolls back the transaction, discarding all changes.
2451    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2452    pub fn rollback(self) -> ProviderResult<()> {
2453        self.inner.rollback().map_err(|e| {
2454            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2455        })
2456    }
2457}
2458
2459/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
2460enum RocksDBIterEnum<'db> {
2461    /// Iterator from read-write `OptimisticTransactionDB`.
2462    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2463    /// Iterator from read-only `DB`.
2464    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2465}
2466
2467impl Iterator for RocksDBIterEnum<'_> {
2468    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2469
2470    fn next(&mut self) -> Option<Self::Item> {
2471        match self {
2472            Self::ReadWrite(iter) => iter.next(),
2473            Self::ReadOnly(iter) => iter.next(),
2474        }
2475    }
2476}
2477
2478/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
2479///
2480/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
2481/// without reinitializing the iterator.
2482enum RocksDBRawIterEnum<'db> {
2483    /// Raw iterator from read-write `OptimisticTransactionDB`.
2484    ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2485    /// Raw iterator from read-only `DB`.
2486    ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2487}
2488
2489impl RocksDBRawIterEnum<'_> {
2490    /// Positions the iterator at the first key >= `key`.
2491    fn seek(&mut self, key: impl AsRef<[u8]>) {
2492        match self {
2493            Self::ReadWrite(iter) => iter.seek(key),
2494            Self::ReadOnly(iter) => iter.seek(key),
2495        }
2496    }
2497
2498    /// Returns true if the iterator is positioned at a valid key-value pair.
2499    fn valid(&self) -> bool {
2500        match self {
2501            Self::ReadWrite(iter) => iter.valid(),
2502            Self::ReadOnly(iter) => iter.valid(),
2503        }
2504    }
2505
2506    /// Returns the current key, if valid.
2507    fn key(&self) -> Option<&[u8]> {
2508        match self {
2509            Self::ReadWrite(iter) => iter.key(),
2510            Self::ReadOnly(iter) => iter.key(),
2511        }
2512    }
2513
2514    /// Returns the current value, if valid.
2515    fn value(&self) -> Option<&[u8]> {
2516        match self {
2517            Self::ReadWrite(iter) => iter.value(),
2518            Self::ReadOnly(iter) => iter.value(),
2519        }
2520    }
2521
2522    /// Advances the iterator to the next key.
2523    fn next(&mut self) {
2524        match self {
2525            Self::ReadWrite(iter) => iter.next(),
2526            Self::ReadOnly(iter) => iter.next(),
2527        }
2528    }
2529
2530    /// Moves the iterator to the previous key.
2531    fn prev(&mut self) {
2532        match self {
2533            Self::ReadWrite(iter) => iter.prev(),
2534            Self::ReadOnly(iter) => iter.prev(),
2535        }
2536    }
2537
2538    /// Returns the status of the iterator.
2539    fn status(&self) -> Result<(), rocksdb::Error> {
2540        match self {
2541            Self::ReadWrite(iter) => iter.status(),
2542            Self::ReadOnly(iter) => iter.status(),
2543        }
2544    }
2545}
2546
2547/// Iterator over a `RocksDB` table (non-transactional).
2548///
2549/// Yields decoded `(Key, Value)` pairs in key order.
2550pub struct RocksDBIter<'db, T: Table> {
2551    inner: RocksDBIterEnum<'db>,
2552    _marker: std::marker::PhantomData<T>,
2553}
2554
2555impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2556    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2557        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2558    }
2559}
2560
2561impl<T: Table> Iterator for RocksDBIter<'_, T> {
2562    type Item = ProviderResult<(T::Key, T::Value)>;
2563
2564    fn next(&mut self) -> Option<Self::Item> {
2565        Some(decode_iter_item::<T>(self.inner.next()?))
2566    }
2567}
2568
2569/// Raw iterator over a `RocksDB` table (non-transactional).
2570///
2571/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2572pub struct RocksDBRawIter<'db> {
2573    inner: RocksDBIterEnum<'db>,
2574}
2575
2576impl fmt::Debug for RocksDBRawIter<'_> {
2577    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2578        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2579    }
2580}
2581
2582impl Iterator for RocksDBRawIter<'_> {
2583    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2584
2585    fn next(&mut self) -> Option<Self::Item> {
2586        match self.inner.next()? {
2587            Ok(kv) => Some(Ok(kv)),
2588            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2589                message: e.to_string().into(),
2590                code: -1,
2591            })))),
2592        }
2593    }
2594}
2595
2596/// Iterator over a `RocksDB` table within a transaction.
2597///
2598/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2599pub struct RocksTxIter<'tx, T: Table> {
2600    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2601    _marker: std::marker::PhantomData<T>,
2602}
2603
2604impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2605    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2606        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2607    }
2608}
2609
2610impl<T: Table> Iterator for RocksTxIter<'_, T> {
2611    type Item = ProviderResult<(T::Key, T::Value)>;
2612
2613    fn next(&mut self) -> Option<Self::Item> {
2614        Some(decode_iter_item::<T>(self.inner.next()?))
2615    }
2616}
2617
2618/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
2619///
2620/// Handles both error propagation from the underlying iterator and
2621/// decoding/decompression of the key and value bytes.
2622fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2623    let (key_bytes, value_bytes) = result.map_err(|e| {
2624        ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2625            message: e.to_string().into(),
2626            code: -1,
2627        }))
2628    })?;
2629
2630    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2631        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2632
2633    let value = T::Value::decompress(&value_bytes)
2634        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2635
2636    Ok((key, value))
2637}
2638
2639/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2640const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2641    match level {
2642        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2643        LogLevel::Error => rocksdb::LogLevel::Error,
2644        LogLevel::Warn => rocksdb::LogLevel::Warn,
2645        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2646        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2647    }
2648}
2649
2650#[cfg(test)]
2651mod tests {
2652    use super::*;
2653    use crate::providers::HistoryInfo;
2654    use alloy_primitives::{Address, TxHash, B256};
2655    use reth_db_api::{
2656        models::{
2657            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2658            storage_sharded_key::StorageShardedKey,
2659            IntegerList,
2660        },
2661        table::Table,
2662        tables,
2663    };
2664    use tempfile::TempDir;
2665
2666    #[test]
2667    fn test_with_default_tables_registers_required_column_families() {
2668        let temp_dir = TempDir::new().unwrap();
2669
2670        // Build with default tables
2671        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2672
2673        // Should be able to write/read TransactionHashNumbers
2674        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2675        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2676        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2677
2678        // Should be able to write/read AccountsHistory
2679        let key = ShardedKey::new(Address::ZERO, 100);
2680        let value = IntegerList::default();
2681        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2682        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2683
2684        // Should be able to write/read StoragesHistory
2685        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2686        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2687        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2688    }
2689
2690    #[derive(Debug)]
2691    struct TestTable;
2692
2693    impl Table for TestTable {
2694        const NAME: &'static str = "TestTable";
2695        const DUPSORT: bool = false;
2696        type Key = u64;
2697        type Value = Vec<u8>;
2698    }
2699
2700    #[test]
2701    fn test_basic_operations() {
2702        let temp_dir = TempDir::new().unwrap();
2703
2704        let provider = RocksDBBuilder::new(temp_dir.path())
2705            .with_table::<TestTable>() // Type-safe!
2706            .build()
2707            .unwrap();
2708
2709        let key = 42u64;
2710        let value = b"test_value".to_vec();
2711
2712        // Test write
2713        provider.put::<TestTable>(key, &value).unwrap();
2714
2715        // Test read
2716        let result = provider.get::<TestTable>(key).unwrap();
2717        assert_eq!(result, Some(value));
2718
2719        // Test delete
2720        provider.delete::<TestTable>(key).unwrap();
2721
2722        // Verify deletion
2723        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2724    }
2725
2726    #[test]
2727    fn test_batch_operations() {
2728        let temp_dir = TempDir::new().unwrap();
2729        let provider =
2730            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2731
2732        // Write multiple entries in a batch
2733        provider
2734            .write_batch(|batch| {
2735                for i in 0..10u64 {
2736                    let value = format!("value_{i}").into_bytes();
2737                    batch.put::<TestTable>(i, &value)?;
2738                }
2739                Ok(())
2740            })
2741            .unwrap();
2742
2743        // Read all entries
2744        for i in 0..10u64 {
2745            let value = format!("value_{i}").into_bytes();
2746            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2747        }
2748
2749        // Delete all entries in a batch
2750        provider
2751            .write_batch(|batch| {
2752                for i in 0..10u64 {
2753                    batch.delete::<TestTable>(i)?;
2754                }
2755                Ok(())
2756            })
2757            .unwrap();
2758
2759        // Verify all deleted
2760        for i in 0..10u64 {
2761            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2762        }
2763    }
2764
2765    #[test]
2766    fn test_with_real_table() {
2767        let temp_dir = TempDir::new().unwrap();
2768        let provider = RocksDBBuilder::new(temp_dir.path())
2769            .with_table::<tables::TransactionHashNumbers>()
2770            .with_metrics()
2771            .build()
2772            .unwrap();
2773
2774        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2775
2776        // Insert and retrieve
2777        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2778        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2779
2780        // Batch insert multiple transactions
2781        provider
2782            .write_batch(|batch| {
2783                for i in 0..10u64 {
2784                    let hash = TxHash::from(B256::from([i as u8; 32]));
2785                    let value = i * 100;
2786                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2787                }
2788                Ok(())
2789            })
2790            .unwrap();
2791
2792        // Verify batch insertions
2793        for i in 0..10u64 {
2794            let hash = TxHash::from(B256::from([i as u8; 32]));
2795            assert_eq!(
2796                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2797                Some(i * 100)
2798            );
2799        }
2800    }
2801    #[test]
2802    fn test_statistics_enabled() {
2803        let temp_dir = TempDir::new().unwrap();
2804        // Just verify that building with statistics doesn't panic
2805        let provider = RocksDBBuilder::new(temp_dir.path())
2806            .with_table::<TestTable>()
2807            .with_statistics()
2808            .build()
2809            .unwrap();
2810
2811        // Do operations - data should be immediately readable with OptimisticTransactionDB
2812        for i in 0..10 {
2813            let value = vec![i as u8];
2814            provider.put::<TestTable>(i, &value).unwrap();
2815            // Verify write is visible
2816            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2817        }
2818    }
2819
2820    #[test]
2821    fn test_data_persistence() {
2822        let temp_dir = TempDir::new().unwrap();
2823        let provider =
2824            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2825
2826        // Insert data - OptimisticTransactionDB writes are immediately visible
2827        let value = vec![42u8; 1000];
2828        for i in 0..100 {
2829            provider.put::<TestTable>(i, &value).unwrap();
2830        }
2831
2832        // Verify data is readable
2833        for i in 0..100 {
2834            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2835        }
2836    }
2837
2838    #[test]
2839    fn test_transaction_read_your_writes() {
2840        let temp_dir = TempDir::new().unwrap();
2841        let provider =
2842            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2843
2844        // Create a transaction
2845        let tx = provider.tx();
2846
2847        // Write data within the transaction
2848        let key = 42u64;
2849        let value = b"test_value".to_vec();
2850        tx.put::<TestTable>(key, &value).unwrap();
2851
2852        // Read-your-writes: should see uncommitted data in same transaction
2853        let result = tx.get::<TestTable>(key).unwrap();
2854        assert_eq!(
2855            result,
2856            Some(value.clone()),
2857            "Transaction should see its own uncommitted writes"
2858        );
2859
2860        // Data should NOT be visible via provider (outside transaction)
2861        let provider_result = provider.get::<TestTable>(key).unwrap();
2862        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2863
2864        // Commit the transaction
2865        tx.commit().unwrap();
2866
2867        // Now data should be visible via provider
2868        let committed_result = provider.get::<TestTable>(key).unwrap();
2869        assert_eq!(committed_result, Some(value), "Committed data should be visible");
2870    }
2871
2872    #[test]
2873    fn test_transaction_rollback() {
2874        let temp_dir = TempDir::new().unwrap();
2875        let provider =
2876            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2877
2878        // First, put some initial data
2879        let key = 100u64;
2880        let initial_value = b"initial".to_vec();
2881        provider.put::<TestTable>(key, &initial_value).unwrap();
2882
2883        // Create a transaction and modify data
2884        let tx = provider.tx();
2885        let new_value = b"modified".to_vec();
2886        tx.put::<TestTable>(key, &new_value).unwrap();
2887
2888        // Verify modification is visible within transaction
2889        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2890
2891        // Rollback instead of commit
2892        tx.rollback().unwrap();
2893
2894        // Data should be unchanged (initial value)
2895        let result = provider.get::<TestTable>(key).unwrap();
2896        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2897    }
2898
2899    #[test]
2900    fn test_transaction_iterator() {
2901        let temp_dir = TempDir::new().unwrap();
2902        let provider =
2903            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2904
2905        // Create a transaction
2906        let tx = provider.tx();
2907
2908        // Write multiple entries
2909        for i in 0..5u64 {
2910            let value = format!("value_{i}").into_bytes();
2911            tx.put::<TestTable>(i, &value).unwrap();
2912        }
2913
2914        // Iterate - should see uncommitted writes
2915        let mut count = 0;
2916        for result in tx.iter::<TestTable>().unwrap() {
2917            let (key, value) = result.unwrap();
2918            assert_eq!(value, format!("value_{key}").into_bytes());
2919            count += 1;
2920        }
2921        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2922
2923        // Commit
2924        tx.commit().unwrap();
2925    }
2926
2927    #[test]
2928    fn test_batch_manual_commit() {
2929        let temp_dir = TempDir::new().unwrap();
2930        let provider =
2931            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2932
2933        // Create a batch via provider.batch()
2934        let mut batch = provider.batch();
2935
2936        // Add entries
2937        for i in 0..10u64 {
2938            let value = format!("batch_value_{i}").into_bytes();
2939            batch.put::<TestTable>(i, &value).unwrap();
2940        }
2941
2942        // Verify len/is_empty
2943        assert_eq!(batch.len(), 10);
2944        assert!(!batch.is_empty());
2945
2946        // Data should NOT be visible before commit
2947        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2948
2949        // Commit the batch
2950        batch.commit().unwrap();
2951
2952        // Now data should be visible
2953        for i in 0..10u64 {
2954            let value = format!("batch_value_{i}").into_bytes();
2955            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2956        }
2957    }
2958
2959    #[test]
2960    fn test_first_and_last_entry() {
2961        let temp_dir = TempDir::new().unwrap();
2962        let provider =
2963            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2964
2965        // Empty table should return None for both
2966        assert_eq!(provider.first::<TestTable>().unwrap(), None);
2967        assert_eq!(provider.last::<TestTable>().unwrap(), None);
2968
2969        // Insert some entries
2970        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2971        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2972        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2973
2974        // First should return the smallest key
2975        let first = provider.first::<TestTable>().unwrap();
2976        assert_eq!(first, Some((5, b"value_5".to_vec())));
2977
2978        // Last should return the largest key
2979        let last = provider.last::<TestTable>().unwrap();
2980        assert_eq!(last, Some((20, b"value_20".to_vec())));
2981    }
2982
2983    /// Tests the edge case where block < `lowest_available_block_number`.
2984    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
2985    /// so we keep this RocksDB-specific test to verify the low-level behavior.
2986    #[test]
2987    fn test_account_history_info_pruned_before_first_entry() {
2988        let temp_dir = TempDir::new().unwrap();
2989        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2990
2991        let address = Address::from([0x42; 20]);
2992
2993        // Create a single shard starting at block 100
2994        let chunk = IntegerList::new([100, 200, 300]).unwrap();
2995        let shard_key = ShardedKey::new(address, u64::MAX);
2996        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2997
2998        // Query for block 50 with lowest_available_block_number = 100
2999        // This simulates a pruned state where data before block 100 is not available.
3000        // Since we're before the first write AND pruning boundary is set, we need to
3001        // check the changeset at the first write block.
3002        let result = provider.snapshot().account_history_info(address, 50, Some(100)).unwrap();
3003        assert_eq!(result, HistoryInfo::InChangeset(100));
3004    }
3005
3006    /// Verifies that history lookups work on a read-only `RocksDB` provider.
3007    /// This was the original bug — read-only mode panicked on `account_history_info`.
3008    #[test]
3009    fn test_account_history_info_read_only() {
3010        let temp_dir = TempDir::new().unwrap();
3011        let address = Address::from([0x42; 20]);
3012        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3013        let shard_key = ShardedKey::new(address, u64::MAX);
3014
3015        // Write data with a read-write provider, then drop it.
3016        {
3017            let provider =
3018                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3019            provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3020        }
3021
3022        // Reopen in read-only mode and verify the lookup succeeds (no panic).
3023        let ro_provider = RocksDBBuilder::new(temp_dir.path())
3024            .with_default_tables()
3025            .with_read_only(true)
3026            .build()
3027            .unwrap();
3028
3029        let result = ro_provider.snapshot().account_history_info(address, 200, None).unwrap();
3030        assert_eq!(result, HistoryInfo::InChangeset(200));
3031
3032        let result = ro_provider.snapshot().account_history_info(address, 50, None).unwrap();
3033        assert_eq!(result, HistoryInfo::NotYetWritten);
3034
3035        let result = ro_provider.snapshot().account_history_info(address, 400, None).unwrap();
3036        assert_eq!(result, HistoryInfo::InPlainState);
3037    }
3038
3039    #[test]
3040    fn test_account_history_shard_split_at_boundary() {
3041        let temp_dir = TempDir::new().unwrap();
3042        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3043
3044        let address = Address::from([0x42; 20]);
3045        let limit = NUM_OF_INDICES_IN_SHARD;
3046
3047        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3048        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3049        let mut batch = provider.batch();
3050        batch.append_account_history_shard(address, indices).unwrap();
3051        batch.commit().unwrap();
3052
3053        // Should have 2 shards: one completed shard and one sentinel shard
3054        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3055        let sentinel_key = ShardedKey::new(address, u64::MAX);
3056
3057        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3058        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3059
3060        assert!(completed_shard.is_some(), "completed shard should exist");
3061        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3062
3063        let completed_shard = completed_shard.unwrap();
3064        let sentinel_shard = sentinel_shard.unwrap();
3065
3066        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3067        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3068    }
3069
3070    #[test]
3071    fn test_account_history_multiple_shard_splits() {
3072        let temp_dir = TempDir::new().unwrap();
3073        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3074
3075        let address = Address::from([0x43; 20]);
3076        let limit = NUM_OF_INDICES_IN_SHARD;
3077
3078        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3079        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3080        let mut batch = provider.batch();
3081        batch.append_account_history_shard(address, first_batch_indices).unwrap();
3082        batch.commit().unwrap();
3083
3084        // Should have just a sentinel shard (exactly at limit, not over)
3085        let sentinel_key = ShardedKey::new(address, u64::MAX);
3086        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3087        assert!(shard.is_some());
3088        assert_eq!(shard.unwrap().len(), limit as u64);
3089
3090        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3091        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3092        let mut batch = provider.batch();
3093        batch.append_account_history_shard(address, second_batch_indices).unwrap();
3094        batch.commit().unwrap();
3095
3096        // Now we should have: 2 completed shards + 1 sentinel shard
3097        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3098        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3099
3100        assert!(
3101            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3102            "first completed shard should exist"
3103        );
3104        assert!(
3105            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3106            "second completed shard should exist"
3107        );
3108        assert!(
3109            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3110            "sentinel shard should exist"
3111        );
3112    }
3113
3114    #[test]
3115    fn test_storage_history_shard_split_at_boundary() {
3116        let temp_dir = TempDir::new().unwrap();
3117        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3118
3119        let address = Address::from([0x44; 20]);
3120        let slot = B256::from([0x55; 32]);
3121        let limit = NUM_OF_INDICES_IN_SHARD;
3122
3123        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3124        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3125        let mut batch = provider.batch();
3126        batch.append_storage_history_shard(address, slot, indices).unwrap();
3127        batch.commit().unwrap();
3128
3129        // Should have 2 shards: one completed shard and one sentinel shard
3130        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3131        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3132
3133        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3134        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3135
3136        assert!(completed_shard.is_some(), "completed shard should exist");
3137        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3138
3139        let completed_shard = completed_shard.unwrap();
3140        let sentinel_shard = sentinel_shard.unwrap();
3141
3142        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3143        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3144    }
3145
3146    #[test]
3147    fn test_storage_history_multiple_shard_splits() {
3148        let temp_dir = TempDir::new().unwrap();
3149        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3150
3151        let address = Address::from([0x46; 20]);
3152        let slot = B256::from([0x57; 32]);
3153        let limit = NUM_OF_INDICES_IN_SHARD;
3154
3155        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3156        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3157        let mut batch = provider.batch();
3158        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3159        batch.commit().unwrap();
3160
3161        // Should have just a sentinel shard (exactly at limit, not over)
3162        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3163        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3164        assert!(shard.is_some());
3165        assert_eq!(shard.unwrap().len(), limit as u64);
3166
3167        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3168        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3169        let mut batch = provider.batch();
3170        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3171        batch.commit().unwrap();
3172
3173        // Now we should have: 2 completed shards + 1 sentinel shard
3174        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3175        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3176
3177        assert!(
3178            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3179            "first completed shard should exist"
3180        );
3181        assert!(
3182            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3183            "second completed shard should exist"
3184        );
3185        assert!(
3186            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3187            "sentinel shard should exist"
3188        );
3189    }
3190
3191    #[test]
3192    fn test_clear_table() {
3193        let temp_dir = TempDir::new().unwrap();
3194        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3195
3196        let address = Address::from([0x42; 20]);
3197        let key = ShardedKey::new(address, u64::MAX);
3198        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3199
3200        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3201        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3202
3203        provider.clear::<tables::AccountsHistory>().unwrap();
3204
3205        assert!(
3206            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3207            "table should be empty after clear"
3208        );
3209        assert!(
3210            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3211            "first() should return None after clear"
3212        );
3213    }
3214
3215    #[test]
3216    fn test_clear_empty_table() {
3217        let temp_dir = TempDir::new().unwrap();
3218        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3219
3220        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3221
3222        provider.clear::<tables::AccountsHistory>().unwrap();
3223
3224        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3225    }
3226
3227    #[test]
3228    fn test_unwind_account_history_to_basic() {
3229        let temp_dir = TempDir::new().unwrap();
3230        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3231
3232        let address = Address::from([0x42; 20]);
3233
3234        // Add blocks 0-10
3235        let mut batch = provider.batch();
3236        batch.append_account_history_shard(address, 0..=10).unwrap();
3237        batch.commit().unwrap();
3238
3239        // Verify we have blocks 0-10
3240        let key = ShardedKey::new(address, u64::MAX);
3241        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3242        assert!(result.is_some());
3243        let blocks: Vec<u64> = result.unwrap().iter().collect();
3244        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3245
3246        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
3247        let mut batch = provider.batch();
3248        batch.unwind_account_history_to(address, 5).unwrap();
3249        batch.commit().unwrap();
3250
3251        // Verify only blocks 0-5 remain
3252        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3253        assert!(result.is_some());
3254        let blocks: Vec<u64> = result.unwrap().iter().collect();
3255        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3256    }
3257
3258    #[test]
3259    fn test_unwind_account_history_to_removes_all() {
3260        let temp_dir = TempDir::new().unwrap();
3261        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3262
3263        let address = Address::from([0x42; 20]);
3264
3265        // Add blocks 5-10
3266        let mut batch = provider.batch();
3267        batch.append_account_history_shard(address, 5..=10).unwrap();
3268        batch.commit().unwrap();
3269
3270        // Unwind to block 4 (removes all blocks since they're all > 4)
3271        let mut batch = provider.batch();
3272        batch.unwind_account_history_to(address, 4).unwrap();
3273        batch.commit().unwrap();
3274
3275        // Verify no data remains for this address
3276        let key = ShardedKey::new(address, u64::MAX);
3277        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3278        assert!(result.is_none(), "Should have no data after full unwind");
3279    }
3280
3281    #[test]
3282    fn test_unwind_account_history_to_no_op() {
3283        let temp_dir = TempDir::new().unwrap();
3284        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3285
3286        let address = Address::from([0x42; 20]);
3287
3288        // Add blocks 0-5
3289        let mut batch = provider.batch();
3290        batch.append_account_history_shard(address, 0..=5).unwrap();
3291        batch.commit().unwrap();
3292
3293        // Unwind to block 10 (no-op since all blocks are <= 10)
3294        let mut batch = provider.batch();
3295        batch.unwind_account_history_to(address, 10).unwrap();
3296        batch.commit().unwrap();
3297
3298        // Verify blocks 0-5 still remain
3299        let key = ShardedKey::new(address, u64::MAX);
3300        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3301        assert!(result.is_some());
3302        let blocks: Vec<u64> = result.unwrap().iter().collect();
3303        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3304    }
3305
3306    #[test]
3307    fn test_unwind_account_history_to_block_zero() {
3308        let temp_dir = TempDir::new().unwrap();
3309        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3310
3311        let address = Address::from([0x42; 20]);
3312
3313        // Add blocks 0-5 (including block 0)
3314        let mut batch = provider.batch();
3315        batch.append_account_history_shard(address, 0..=5).unwrap();
3316        batch.commit().unwrap();
3317
3318        // Unwind to block 0 (keep only block 0, remove 1-5)
3319        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
3320        let mut batch = provider.batch();
3321        batch.unwind_account_history_to(address, 0).unwrap();
3322        batch.commit().unwrap();
3323
3324        // Verify only block 0 remains
3325        let key = ShardedKey::new(address, u64::MAX);
3326        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3327        assert!(result.is_some());
3328        let blocks: Vec<u64> = result.unwrap().iter().collect();
3329        assert_eq!(blocks, vec![0]);
3330    }
3331
3332    #[test]
3333    fn test_unwind_account_history_to_multi_shard() {
3334        let temp_dir = TempDir::new().unwrap();
3335        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3336
3337        let address = Address::from([0x42; 20]);
3338
3339        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
3340        // For testing, we'll manually create shards with specific keys
3341        let mut batch = provider.batch();
3342
3343        // First shard: blocks 1-50, keyed by 50
3344        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3345        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3346
3347        // Second shard: blocks 51-100, keyed by MAX (sentinel)
3348        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3349        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3350
3351        batch.commit().unwrap();
3352
3353        // Verify we have 2 shards
3354        let shards = provider.account_history_shards(address).unwrap();
3355        assert_eq!(shards.len(), 2);
3356
3357        // Unwind to block 75 (keep 1-75, remove 76-100)
3358        let mut batch = provider.batch();
3359        batch.unwind_account_history_to(address, 75).unwrap();
3360        batch.commit().unwrap();
3361
3362        // Verify: shard1 should be untouched, shard2 should be truncated
3363        let shards = provider.account_history_shards(address).unwrap();
3364        assert_eq!(shards.len(), 2);
3365
3366        // First shard unchanged
3367        assert_eq!(shards[0].0.highest_block_number, 50);
3368        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3369
3370        // Second shard truncated and re-keyed to MAX
3371        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3372        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3373    }
3374
3375    #[test]
3376    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3377        let temp_dir = TempDir::new().unwrap();
3378        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3379
3380        let address = Address::from([0x42; 20]);
3381
3382        // Create two shards
3383        let mut batch = provider.batch();
3384
3385        // First shard: blocks 1-50, keyed by 50
3386        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3387        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3388
3389        // Second shard: blocks 75-100, keyed by MAX
3390        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3391        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3392
3393        batch.commit().unwrap();
3394
3395        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
3396        let mut batch = provider.batch();
3397        batch.unwind_account_history_to(address, 60).unwrap();
3398        batch.commit().unwrap();
3399
3400        // Verify: only shard1 remains, now keyed as MAX
3401        let shards = provider.account_history_shards(address).unwrap();
3402        assert_eq!(shards.len(), 1);
3403        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3404        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3405    }
3406
3407    #[test]
3408    fn test_account_history_shards_iterator() {
3409        let temp_dir = TempDir::new().unwrap();
3410        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3411
3412        let address = Address::from([0x42; 20]);
3413        let other_address = Address::from([0x43; 20]);
3414
3415        // Add data for two addresses
3416        let mut batch = provider.batch();
3417        batch.append_account_history_shard(address, 0..=5).unwrap();
3418        batch.append_account_history_shard(other_address, 10..=15).unwrap();
3419        batch.commit().unwrap();
3420
3421        // Query shards for first address only
3422        let shards = provider.account_history_shards(address).unwrap();
3423        assert_eq!(shards.len(), 1);
3424        assert_eq!(shards[0].0.key, address);
3425
3426        // Query shards for second address only
3427        let shards = provider.account_history_shards(other_address).unwrap();
3428        assert_eq!(shards.len(), 1);
3429        assert_eq!(shards[0].0.key, other_address);
3430
3431        // Query shards for non-existent address
3432        let non_existent = Address::from([0x99; 20]);
3433        let shards = provider.account_history_shards(non_existent).unwrap();
3434        assert!(shards.is_empty());
3435    }
3436
3437    #[test]
3438    fn test_clear_account_history() {
3439        let temp_dir = TempDir::new().unwrap();
3440        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3441
3442        let address = Address::from([0x42; 20]);
3443
3444        // Add blocks 0-10
3445        let mut batch = provider.batch();
3446        batch.append_account_history_shard(address, 0..=10).unwrap();
3447        batch.commit().unwrap();
3448
3449        // Clear all history (simulates unwind from block 0)
3450        let mut batch = provider.batch();
3451        batch.clear_account_history(address).unwrap();
3452        batch.commit().unwrap();
3453
3454        // Verify no data remains
3455        let shards = provider.account_history_shards(address).unwrap();
3456        assert!(shards.is_empty(), "All shards should be deleted");
3457    }
3458
3459    #[test]
3460    fn test_unwind_non_sentinel_boundary() {
3461        let temp_dir = TempDir::new().unwrap();
3462        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3463
3464        let address = Address::from([0x42; 20]);
3465
3466        // Create three shards with non-sentinel boundary
3467        let mut batch = provider.batch();
3468
3469        // Shard 1: blocks 1-50, keyed by 50
3470        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3471        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3472
3473        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
3474        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3475        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3476
3477        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
3478        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3479        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3480
3481        batch.commit().unwrap();
3482
3483        // Verify 3 shards
3484        let shards = provider.account_history_shards(address).unwrap();
3485        assert_eq!(shards.len(), 3);
3486
3487        // Unwind to block 75 (truncates shard2, deletes shard3)
3488        let mut batch = provider.batch();
3489        batch.unwind_account_history_to(address, 75).unwrap();
3490        batch.commit().unwrap();
3491
3492        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
3493        let shards = provider.account_history_shards(address).unwrap();
3494        assert_eq!(shards.len(), 2);
3495
3496        // First shard unchanged
3497        assert_eq!(shards[0].0.highest_block_number, 50);
3498        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3499
3500        // Second shard truncated and re-keyed to MAX
3501        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3502        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3503    }
3504
3505    #[test]
3506    fn test_batch_auto_commit_on_threshold() {
3507        let temp_dir = TempDir::new().unwrap();
3508        let provider =
3509            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3510
3511        // Create batch with tiny threshold (1KB) to force auto-commits
3512        let mut batch = RocksDBBatch {
3513            provider: &provider,
3514            inner: WriteBatchWithTransaction::<true>::default(),
3515            buf: Vec::new(),
3516            auto_commit_threshold: Some(1024), // 1KB
3517        };
3518
3519        // Write entries until we exceed threshold multiple times
3520        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
3521        for i in 0..100u64 {
3522            let value = format!("value_{i:04}").into_bytes();
3523            batch.put::<TestTable>(i, &value).unwrap();
3524        }
3525
3526        // Data should already be visible (auto-committed) even before final commit
3527        // At least some entries should be readable
3528        let first_visible = provider.get::<TestTable>(0).unwrap();
3529        assert!(first_visible.is_some(), "Auto-committed data should be visible");
3530
3531        // Final commit for remaining batch
3532        batch.commit().unwrap();
3533
3534        // All entries should now be visible
3535        for i in 0..100u64 {
3536            let value = format!("value_{i:04}").into_bytes();
3537            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3538        }
3539    }
3540
3541    // ==================== PARAMETERIZED PRUNE TESTS ====================
3542
3543    /// Test case for account history pruning
3544    struct AccountPruneCase {
3545        name: &'static str,
3546        initial_shards: &'static [(u64, &'static [u64])],
3547        prune_to: u64,
3548        expected_outcome: PruneShardOutcome,
3549        expected_shards: &'static [(u64, &'static [u64])],
3550    }
3551
3552    /// Test case for storage history pruning
3553    struct StoragePruneCase {
3554        name: &'static str,
3555        initial_shards: &'static [(u64, &'static [u64])],
3556        prune_to: u64,
3557        expected_outcome: PruneShardOutcome,
3558        expected_shards: &'static [(u64, &'static [u64])],
3559    }
3560
3561    #[test]
3562    fn test_prune_account_history_cases() {
3563        const MAX: u64 = u64::MAX;
3564        const CASES: &[AccountPruneCase] = &[
3565            AccountPruneCase {
3566                name: "single_shard_truncate",
3567                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3568                prune_to: 25,
3569                expected_outcome: PruneShardOutcome::Updated,
3570                expected_shards: &[(MAX, &[30, 40])],
3571            },
3572            AccountPruneCase {
3573                name: "single_shard_delete_all",
3574                initial_shards: &[(MAX, &[10, 20])],
3575                prune_to: 20,
3576                expected_outcome: PruneShardOutcome::Deleted,
3577                expected_shards: &[],
3578            },
3579            AccountPruneCase {
3580                name: "single_shard_noop",
3581                initial_shards: &[(MAX, &[10, 20])],
3582                prune_to: 5,
3583                expected_outcome: PruneShardOutcome::Unchanged,
3584                expected_shards: &[(MAX, &[10, 20])],
3585            },
3586            AccountPruneCase {
3587                name: "no_shards",
3588                initial_shards: &[],
3589                prune_to: 100,
3590                expected_outcome: PruneShardOutcome::Unchanged,
3591                expected_shards: &[],
3592            },
3593            AccountPruneCase {
3594                name: "multi_shard_truncate_first",
3595                initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3596                prune_to: 25,
3597                expected_outcome: PruneShardOutcome::Updated,
3598                expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3599            },
3600            AccountPruneCase {
3601                name: "delete_first_shard_sentinel_unchanged",
3602                initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3603                prune_to: 20,
3604                expected_outcome: PruneShardOutcome::Deleted,
3605                expected_shards: &[(MAX, &[30, 40])],
3606            },
3607            AccountPruneCase {
3608                name: "multi_shard_delete_all_but_last",
3609                initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3610                prune_to: 22,
3611                expected_outcome: PruneShardOutcome::Deleted,
3612                expected_shards: &[(MAX, &[25, 30])],
3613            },
3614            AccountPruneCase {
3615                name: "mid_shard_preserves_key",
3616                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3617                prune_to: 25,
3618                expected_outcome: PruneShardOutcome::Updated,
3619                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3620            },
3621            // Equivalence tests
3622            AccountPruneCase {
3623                name: "equiv_delete_early_shards_keep_sentinel",
3624                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3625                prune_to: 55,
3626                expected_outcome: PruneShardOutcome::Deleted,
3627                expected_shards: &[(MAX, &[60, 70])],
3628            },
3629            AccountPruneCase {
3630                name: "equiv_sentinel_becomes_empty_with_prev",
3631                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3632                prune_to: 40,
3633                expected_outcome: PruneShardOutcome::Deleted,
3634                expected_shards: &[(MAX, &[50])],
3635            },
3636            AccountPruneCase {
3637                name: "equiv_all_shards_become_empty",
3638                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3639                prune_to: 51,
3640                expected_outcome: PruneShardOutcome::Deleted,
3641                expected_shards: &[],
3642            },
3643            AccountPruneCase {
3644                name: "equiv_non_sentinel_last_shard_promoted",
3645                initial_shards: &[(100, &[50, 75, 100])],
3646                prune_to: 60,
3647                expected_outcome: PruneShardOutcome::Updated,
3648                expected_shards: &[(MAX, &[75, 100])],
3649            },
3650            AccountPruneCase {
3651                name: "equiv_filter_within_shard",
3652                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3653                prune_to: 25,
3654                expected_outcome: PruneShardOutcome::Updated,
3655                expected_shards: &[(MAX, &[30, 40])],
3656            },
3657            AccountPruneCase {
3658                name: "equiv_multi_shard_partial_delete",
3659                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3660                prune_to: 35,
3661                expected_outcome: PruneShardOutcome::Deleted,
3662                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3663            },
3664        ];
3665
3666        let address = Address::from([0x42; 20]);
3667
3668        for case in CASES {
3669            let temp_dir = TempDir::new().unwrap();
3670            let provider =
3671                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3672
3673            // Setup initial shards
3674            let mut batch = provider.batch();
3675            for (highest, blocks) in case.initial_shards {
3676                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3677                batch
3678                    .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3679                    .unwrap();
3680            }
3681            batch.commit().unwrap();
3682
3683            // Prune
3684            let mut batch = provider.batch();
3685            let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3686            batch.commit().unwrap();
3687
3688            // Assert outcome
3689            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3690
3691            // Assert final shards
3692            let shards = provider.account_history_shards(address).unwrap();
3693            assert_eq!(
3694                shards.len(),
3695                case.expected_shards.len(),
3696                "case '{}': wrong shard count",
3697                case.name
3698            );
3699            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3700                shards.iter().zip(case.expected_shards.iter()).enumerate()
3701            {
3702                assert_eq!(
3703                    key.highest_block_number, *exp_key,
3704                    "case '{}': shard {} wrong key",
3705                    case.name, i
3706                );
3707                assert_eq!(
3708                    blocks.iter().collect::<Vec<_>>(),
3709                    *exp_blocks,
3710                    "case '{}': shard {} wrong blocks",
3711                    case.name,
3712                    i
3713                );
3714            }
3715        }
3716    }
3717
3718    #[test]
3719    fn test_prune_storage_history_cases() {
3720        const MAX: u64 = u64::MAX;
3721        const CASES: &[StoragePruneCase] = &[
3722            StoragePruneCase {
3723                name: "single_shard_truncate",
3724                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3725                prune_to: 25,
3726                expected_outcome: PruneShardOutcome::Updated,
3727                expected_shards: &[(MAX, &[30, 40])],
3728            },
3729            StoragePruneCase {
3730                name: "single_shard_delete_all",
3731                initial_shards: &[(MAX, &[10, 20])],
3732                prune_to: 20,
3733                expected_outcome: PruneShardOutcome::Deleted,
3734                expected_shards: &[],
3735            },
3736            StoragePruneCase {
3737                name: "noop",
3738                initial_shards: &[(MAX, &[10, 20])],
3739                prune_to: 5,
3740                expected_outcome: PruneShardOutcome::Unchanged,
3741                expected_shards: &[(MAX, &[10, 20])],
3742            },
3743            StoragePruneCase {
3744                name: "no_shards",
3745                initial_shards: &[],
3746                prune_to: 100,
3747                expected_outcome: PruneShardOutcome::Unchanged,
3748                expected_shards: &[],
3749            },
3750            StoragePruneCase {
3751                name: "mid_shard_preserves_key",
3752                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3753                prune_to: 25,
3754                expected_outcome: PruneShardOutcome::Updated,
3755                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3756            },
3757            // Equivalence tests
3758            StoragePruneCase {
3759                name: "equiv_sentinel_promotion",
3760                initial_shards: &[(100, &[50, 75, 100])],
3761                prune_to: 60,
3762                expected_outcome: PruneShardOutcome::Updated,
3763                expected_shards: &[(MAX, &[75, 100])],
3764            },
3765            StoragePruneCase {
3766                name: "equiv_delete_early_shards_keep_sentinel",
3767                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3768                prune_to: 55,
3769                expected_outcome: PruneShardOutcome::Deleted,
3770                expected_shards: &[(MAX, &[60, 70])],
3771            },
3772            StoragePruneCase {
3773                name: "equiv_sentinel_becomes_empty_with_prev",
3774                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3775                prune_to: 40,
3776                expected_outcome: PruneShardOutcome::Deleted,
3777                expected_shards: &[(MAX, &[50])],
3778            },
3779            StoragePruneCase {
3780                name: "equiv_all_shards_become_empty",
3781                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3782                prune_to: 51,
3783                expected_outcome: PruneShardOutcome::Deleted,
3784                expected_shards: &[],
3785            },
3786            StoragePruneCase {
3787                name: "equiv_filter_within_shard",
3788                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3789                prune_to: 25,
3790                expected_outcome: PruneShardOutcome::Updated,
3791                expected_shards: &[(MAX, &[30, 40])],
3792            },
3793            StoragePruneCase {
3794                name: "equiv_multi_shard_partial_delete",
3795                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3796                prune_to: 35,
3797                expected_outcome: PruneShardOutcome::Deleted,
3798                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3799            },
3800        ];
3801
3802        let address = Address::from([0x42; 20]);
3803        let storage_key = B256::from([0x01; 32]);
3804
3805        for case in CASES {
3806            let temp_dir = TempDir::new().unwrap();
3807            let provider =
3808                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3809
3810            // Setup initial shards
3811            let mut batch = provider.batch();
3812            for (highest, blocks) in case.initial_shards {
3813                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3814                let key = if *highest == MAX {
3815                    StorageShardedKey::last(address, storage_key)
3816                } else {
3817                    StorageShardedKey::new(address, storage_key, *highest)
3818                };
3819                batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3820            }
3821            batch.commit().unwrap();
3822
3823            // Prune
3824            let mut batch = provider.batch();
3825            let outcome =
3826                batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3827            batch.commit().unwrap();
3828
3829            // Assert outcome
3830            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3831
3832            // Assert final shards
3833            let shards = provider.storage_history_shards(address, storage_key).unwrap();
3834            assert_eq!(
3835                shards.len(),
3836                case.expected_shards.len(),
3837                "case '{}': wrong shard count",
3838                case.name
3839            );
3840            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3841                shards.iter().zip(case.expected_shards.iter()).enumerate()
3842            {
3843                assert_eq!(
3844                    key.sharded_key.highest_block_number, *exp_key,
3845                    "case '{}': shard {} wrong key",
3846                    case.name, i
3847                );
3848                assert_eq!(
3849                    blocks.iter().collect::<Vec<_>>(),
3850                    *exp_blocks,
3851                    "case '{}': shard {} wrong blocks",
3852                    case.name,
3853                    i
3854                );
3855            }
3856        }
3857    }
3858
3859    #[test]
3860    fn test_prune_storage_history_does_not_affect_other_slots() {
3861        let temp_dir = TempDir::new().unwrap();
3862        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3863
3864        let address = Address::from([0x42; 20]);
3865        let slot1 = B256::from([0x01; 32]);
3866        let slot2 = B256::from([0x02; 32]);
3867
3868        // Two different storage slots
3869        let mut batch = provider.batch();
3870        batch
3871            .put::<tables::StoragesHistory>(
3872                StorageShardedKey::last(address, slot1),
3873                &BlockNumberList::new_pre_sorted([10u64, 20]),
3874            )
3875            .unwrap();
3876        batch
3877            .put::<tables::StoragesHistory>(
3878                StorageShardedKey::last(address, slot2),
3879                &BlockNumberList::new_pre_sorted([30u64, 40]),
3880            )
3881            .unwrap();
3882        batch.commit().unwrap();
3883
3884        // Prune slot1 to block 20 (deletes all)
3885        let mut batch = provider.batch();
3886        let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
3887        batch.commit().unwrap();
3888
3889        assert_eq!(outcome, PruneShardOutcome::Deleted);
3890
3891        // slot1 should be empty
3892        let shards1 = provider.storage_history_shards(address, slot1).unwrap();
3893        assert!(shards1.is_empty());
3894
3895        // slot2 should be unchanged
3896        let shards2 = provider.storage_history_shards(address, slot2).unwrap();
3897        assert_eq!(shards2.len(), 1);
3898        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
3899    }
3900
3901    #[test]
3902    fn test_prune_invariants() {
3903        // Test invariants: no empty shards, sentinel is always last
3904        let address = Address::from([0x42; 20]);
3905        let storage_key = B256::from([0x01; 32]);
3906
3907        // Test cases that exercise invariants
3908        #[allow(clippy::type_complexity)]
3909        let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
3910            // Account: shards where middle becomes empty
3911            (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
3912            // Account: non-sentinel shard only, partial prune -> must become sentinel
3913            (&[(100, &[50, 100])], 60),
3914        ];
3915
3916        for (initial_shards, prune_to) in invariant_cases {
3917            // Test account history invariants
3918            {
3919                let temp_dir = TempDir::new().unwrap();
3920                let provider =
3921                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3922
3923                let mut batch = provider.batch();
3924                for (highest, blocks) in *initial_shards {
3925                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3926                    batch
3927                        .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3928                        .unwrap();
3929                }
3930                batch.commit().unwrap();
3931
3932                let mut batch = provider.batch();
3933                batch.prune_account_history_to(address, *prune_to).unwrap();
3934                batch.commit().unwrap();
3935
3936                let shards = provider.account_history_shards(address).unwrap();
3937
3938                // Invariant 1: no empty shards
3939                for (key, blocks) in &shards {
3940                    assert!(
3941                        !blocks.is_empty(),
3942                        "Account: empty shard at key {}",
3943                        key.highest_block_number
3944                    );
3945                }
3946
3947                // Invariant 2: last shard is sentinel
3948                if !shards.is_empty() {
3949                    let last = shards.last().unwrap();
3950                    assert_eq!(
3951                        last.0.highest_block_number,
3952                        u64::MAX,
3953                        "Account: last shard must be sentinel"
3954                    );
3955                }
3956            }
3957
3958            // Test storage history invariants
3959            {
3960                let temp_dir = TempDir::new().unwrap();
3961                let provider =
3962                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3963
3964                let mut batch = provider.batch();
3965                for (highest, blocks) in *initial_shards {
3966                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3967                    let key = if *highest == u64::MAX {
3968                        StorageShardedKey::last(address, storage_key)
3969                    } else {
3970                        StorageShardedKey::new(address, storage_key, *highest)
3971                    };
3972                    batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3973                }
3974                batch.commit().unwrap();
3975
3976                let mut batch = provider.batch();
3977                batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
3978                batch.commit().unwrap();
3979
3980                let shards = provider.storage_history_shards(address, storage_key).unwrap();
3981
3982                // Invariant 1: no empty shards
3983                for (key, blocks) in &shards {
3984                    assert!(
3985                        !blocks.is_empty(),
3986                        "Storage: empty shard at key {}",
3987                        key.sharded_key.highest_block_number
3988                    );
3989                }
3990
3991                // Invariant 2: last shard is sentinel
3992                if !shards.is_empty() {
3993                    let last = shards.last().unwrap();
3994                    assert_eq!(
3995                        last.0.sharded_key.highest_block_number,
3996                        u64::MAX,
3997                        "Storage: last shard must be sentinel"
3998                    );
3999                }
4000            }
4001        }
4002    }
4003
4004    #[test]
4005    fn test_prune_account_history_batch_multiple_sorted_targets() {
4006        let temp_dir = TempDir::new().unwrap();
4007        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4008
4009        let addr1 = Address::from([0x01; 20]);
4010        let addr2 = Address::from([0x02; 20]);
4011        let addr3 = Address::from([0x03; 20]);
4012
4013        // Setup shards for each address
4014        let mut batch = provider.batch();
4015        batch
4016            .put::<tables::AccountsHistory>(
4017                ShardedKey::new(addr1, u64::MAX),
4018                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4019            )
4020            .unwrap();
4021        batch
4022            .put::<tables::AccountsHistory>(
4023                ShardedKey::new(addr2, u64::MAX),
4024                &BlockNumberList::new_pre_sorted([5, 10, 15]),
4025            )
4026            .unwrap();
4027        batch
4028            .put::<tables::AccountsHistory>(
4029                ShardedKey::new(addr3, u64::MAX),
4030                &BlockNumberList::new_pre_sorted([100, 200]),
4031            )
4032            .unwrap();
4033        batch.commit().unwrap();
4034
4035        // Prune all three (sorted by address)
4036        let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4037        targets.sort_by_key(|(addr, _)| *addr);
4038
4039        let mut batch = provider.batch();
4040        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4041        batch.commit().unwrap();
4042
4043        // addr1: prune <=15, keep [20, 30] -> updated
4044        // addr2: prune <=10, keep [15] -> updated
4045        // addr3: prune <=50, keep [100, 200] -> unchanged
4046        assert_eq!(outcomes.updated, 2);
4047        assert_eq!(outcomes.unchanged, 1);
4048
4049        let shards1 = provider.account_history_shards(addr1).unwrap();
4050        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4051
4052        let shards2 = provider.account_history_shards(addr2).unwrap();
4053        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4054
4055        let shards3 = provider.account_history_shards(addr3).unwrap();
4056        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4057    }
4058
4059    #[test]
4060    fn test_prune_account_history_batch_target_with_no_shards() {
4061        let temp_dir = TempDir::new().unwrap();
4062        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4063
4064        let addr1 = Address::from([0x01; 20]);
4065        let addr2 = Address::from([0x02; 20]); // No shards for this one
4066        let addr3 = Address::from([0x03; 20]);
4067
4068        // Only setup shards for addr1 and addr3
4069        let mut batch = provider.batch();
4070        batch
4071            .put::<tables::AccountsHistory>(
4072                ShardedKey::new(addr1, u64::MAX),
4073                &BlockNumberList::new_pre_sorted([10, 20]),
4074            )
4075            .unwrap();
4076        batch
4077            .put::<tables::AccountsHistory>(
4078                ShardedKey::new(addr3, u64::MAX),
4079                &BlockNumberList::new_pre_sorted([30, 40]),
4080            )
4081            .unwrap();
4082        batch.commit().unwrap();
4083
4084        // Prune all three (addr2 has no shards - tests p > target_prefix case)
4085        let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4086        targets.sort_by_key(|(addr, _)| *addr);
4087
4088        let mut batch = provider.batch();
4089        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4090        batch.commit().unwrap();
4091
4092        // addr1: updated (keep [20])
4093        // addr2: unchanged (no shards)
4094        // addr3: updated (keep [40])
4095        assert_eq!(outcomes.updated, 2);
4096        assert_eq!(outcomes.unchanged, 1);
4097
4098        let shards1 = provider.account_history_shards(addr1).unwrap();
4099        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4100
4101        let shards3 = provider.account_history_shards(addr3).unwrap();
4102        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4103    }
4104
4105    #[test]
4106    fn test_prune_storage_history_batch_multiple_sorted_targets() {
4107        let temp_dir = TempDir::new().unwrap();
4108        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4109
4110        let addr = Address::from([0x42; 20]);
4111        let slot1 = B256::from([0x01; 32]);
4112        let slot2 = B256::from([0x02; 32]);
4113
4114        // Setup shards
4115        let mut batch = provider.batch();
4116        batch
4117            .put::<tables::StoragesHistory>(
4118                StorageShardedKey::new(addr, slot1, u64::MAX),
4119                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4120            )
4121            .unwrap();
4122        batch
4123            .put::<tables::StoragesHistory>(
4124                StorageShardedKey::new(addr, slot2, u64::MAX),
4125                &BlockNumberList::new_pre_sorted([5, 15, 25]),
4126            )
4127            .unwrap();
4128        batch.commit().unwrap();
4129
4130        // Prune both (sorted)
4131        let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4132        targets.sort_by_key(|((a, s), _)| (*a, *s));
4133
4134        let mut batch = provider.batch();
4135        let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4136        batch.commit().unwrap();
4137
4138        assert_eq!(outcomes.updated, 2);
4139
4140        let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4141        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4142
4143        let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4144        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4145    }
4146}