Skip to main content

reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5    map::{AddressMap, HashMap},
6    Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13    database_metrics::DatabaseMetrics,
14    models::{
15        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16        StorageSettings,
17    },
18    table::{Compress, Decode, Decompress, Encode, Table},
19    tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25    provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30    OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
31    DB,
32};
33use std::{
34    collections::BTreeMap,
35    fmt,
36    path::{Path, PathBuf},
37    sync::Arc,
38};
39use tracing::instrument;
40
41/// Pending `RocksDB` batches type alias.
42pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
43
44/// Raw key-value result from a `RocksDB` iterator.
45type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
46
47/// Statistics for a single `RocksDB` table (column family).
48#[derive(Debug, Clone)]
49pub struct RocksDBTableStats {
50    /// Size of SST files on disk in bytes.
51    pub sst_size_bytes: u64,
52    /// Size of memtables in memory in bytes.
53    pub memtable_size_bytes: u64,
54    /// Name of the table/column family.
55    pub name: String,
56    /// Estimated number of keys in the table.
57    pub estimated_num_keys: u64,
58    /// Estimated size of live data in bytes (SST files + memtables).
59    pub estimated_size_bytes: u64,
60    /// Estimated bytes pending compaction (reclaimable space).
61    pub pending_compaction_bytes: u64,
62}
63
64/// Database-level statistics for `RocksDB`.
65///
66/// Contains both per-table statistics and DB-level metrics like WAL size.
67#[derive(Debug, Clone)]
68pub struct RocksDBStats {
69    /// Statistics for each table (column family).
70    pub tables: Vec<RocksDBTableStats>,
71    /// Total size of WAL (Write-Ahead Log) files in bytes.
72    ///
73    /// WAL is shared across all tables and not included in per-table metrics.
74    pub wal_size_bytes: u64,
75}
76
77/// Context for `RocksDB` block writes.
78#[derive(Clone)]
79pub(crate) struct RocksDBWriteCtx {
80    /// The first block number being written.
81    pub first_block_number: BlockNumber,
82    /// The prune mode for transaction lookup, if any.
83    pub prune_tx_lookup: Option<PruneMode>,
84    /// Storage settings determining what goes to `RocksDB`.
85    pub storage_settings: StorageSettings,
86    /// Pending batches to push to after writing.
87    pub pending_batches: PendingRocksDBBatches,
88}
89
90impl fmt::Debug for RocksDBWriteCtx {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("RocksDBWriteCtx")
93            .field("first_block_number", &self.first_block_number)
94            .field("prune_tx_lookup", &self.prune_tx_lookup)
95            .field("storage_settings", &self.storage_settings)
96            .field("pending_batches", &"<pending batches>")
97            .finish()
98    }
99}
100
101/// Default cache size for `RocksDB` block cache (128 MB).
102const DEFAULT_CACHE_SIZE: usize = 128 << 20;
103
104/// Default block size for `RocksDB` tables (16 KB).
105const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
106
107/// Default max background jobs for `RocksDB` compaction and flushing.
108const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
109
110/// Default max open file descriptors for `RocksDB`.
111///
112/// Caps the number of SST file handles `RocksDB` keeps open simultaneously.
113/// Set to 512 to stay within the common default OS `ulimit -n` of 1024,
114/// leaving headroom for MDBX, static files, and other I/O.
115/// `RocksDB` uses an internal table cache and re-opens files on demand,
116/// so this has negligible performance impact on read-heavy workloads.
117const DEFAULT_MAX_OPEN_FILES: i32 = 512;
118
119/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
120const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
121
122/// Default write buffer size for `RocksDB` memtables (128 MB).
123///
124/// Larger memtables reduce flush frequency during burst writes, providing more consistent
125/// tail latency. Benchmarks showed 128 MB reduces p99 latency variance by ~80% compared
126/// to 64 MB default, with negligible impact on mean throughput.
127const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
128
129/// Default buffer capacity for compression in batches.
130/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
131/// reducing the first few reallocations without over-allocating.
132const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
133
134/// Default auto-commit threshold for batch writes (4 GiB).
135///
136/// When a batch exceeds this size, it is automatically committed to prevent OOM
137/// during large bulk writes. The consistency check on startup heals any crash
138/// that occurs between auto-commits.
139const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
140
141/// Builder for [`RocksDBProvider`].
142pub struct RocksDBBuilder {
143    path: PathBuf,
144    column_families: Vec<String>,
145    enable_metrics: bool,
146    enable_statistics: bool,
147    log_level: rocksdb::LogLevel,
148    block_cache: Cache,
149    read_only: bool,
150}
151
152impl fmt::Debug for RocksDBBuilder {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.debug_struct("RocksDBBuilder")
155            .field("path", &self.path)
156            .field("column_families", &self.column_families)
157            .field("enable_metrics", &self.enable_metrics)
158            .finish()
159    }
160}
161
162impl RocksDBBuilder {
163    /// Creates a new builder with optimized default options.
164    pub fn new(path: impl AsRef<Path>) -> Self {
165        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
166        Self {
167            path: path.as_ref().to_path_buf(),
168            column_families: Vec::new(),
169            enable_metrics: false,
170            enable_statistics: false,
171            log_level: rocksdb::LogLevel::Info,
172            block_cache: cache,
173            read_only: false,
174        }
175    }
176
177    /// Creates default table options with shared block cache.
178    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
179        let mut table_options = BlockBasedOptions::default();
180        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
181        table_options.set_cache_index_and_filter_blocks(true);
182        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
183        // Shared block cache for all column families.
184        table_options.set_block_cache(cache);
185        table_options
186    }
187
188    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
189    fn default_options(
190        log_level: rocksdb::LogLevel,
191        cache: &Cache,
192        enable_statistics: bool,
193    ) -> Options {
194        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
195        let table_options = Self::default_table_options(cache);
196
197        let mut options = Options::default();
198        options.set_block_based_table_factory(&table_options);
199        options.create_if_missing(true);
200        options.create_missing_column_families(true);
201        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
202        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
203
204        options.set_bottommost_compression_type(DBCompressionType::Zstd);
205        options.set_bottommost_zstd_max_train_bytes(0, true);
206        options.set_compression_type(DBCompressionType::Lz4);
207        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
208
209        options.set_log_level(log_level);
210
211        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
212
213        // Delete obsolete WAL files immediately after all column families have flushed.
214        // Both set to 0 means "delete ASAP, no archival".
215        options.set_wal_ttl_seconds(0);
216        options.set_wal_size_limit_mb(0);
217
218        // Statistics can view from RocksDB log file
219        if enable_statistics {
220            options.enable_statistics();
221        }
222
223        options
224    }
225
226    /// Creates optimized column family options.
227    fn default_column_family_options(cache: &Cache) -> Options {
228        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
229        let table_options = Self::default_table_options(cache);
230
231        let mut cf_options = Options::default();
232        cf_options.set_block_based_table_factory(&table_options);
233        cf_options.set_level_compaction_dynamic_level_bytes(true);
234        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
235        cf_options.set_compression_type(DBCompressionType::Lz4);
236        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
237        // Only use Zstd compression, disable dictionary training
238        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
239        cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
240
241        cf_options
242    }
243
244    /// Creates optimized column family options for `TransactionHashNumbers`.
245    ///
246    /// This table stores `B256 -> TxNumber` mappings where:
247    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
248    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
249    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
250    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
251        let mut table_options = BlockBasedOptions::default();
252        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
253        table_options.set_cache_index_and_filter_blocks(true);
254        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
255        table_options.set_block_cache(cache);
256        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
257        // and waste memory
258
259        let mut cf_options = Options::default();
260        cf_options.set_block_based_table_factory(&table_options);
261        cf_options.set_level_compaction_dynamic_level_bytes(true);
262        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
263        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
264        cf_options.set_compression_type(DBCompressionType::None);
265        cf_options.set_bottommost_compression_type(DBCompressionType::None);
266
267        cf_options
268    }
269
270    /// Adds a column family for a specific table type.
271    pub fn with_table<T: Table>(mut self) -> Self {
272        self.column_families.push(T::NAME.to_string());
273        self
274    }
275
276    /// Registers the default tables used by reth for `RocksDB` storage.
277    ///
278    /// This registers:
279    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
280    /// - [`tables::AccountsHistory`] - Account history index
281    /// - [`tables::StoragesHistory`] - Storage history index
282    pub fn with_default_tables(self) -> Self {
283        self.with_table::<tables::TransactionHashNumbers>()
284            .with_table::<tables::AccountsHistory>()
285            .with_table::<tables::StoragesHistory>()
286    }
287
288    /// Enables metrics.
289    pub const fn with_metrics(mut self) -> Self {
290        self.enable_metrics = true;
291        self
292    }
293
294    /// Enables `RocksDB` internal statistics collection.
295    pub const fn with_statistics(mut self) -> Self {
296        self.enable_statistics = true;
297        self
298    }
299
300    /// Sets the log level from `DatabaseArgs` configuration.
301    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
302        if let Some(level) = log_level {
303            self.log_level = convert_log_level(level);
304        }
305        self
306    }
307
308    /// Sets a custom block cache size.
309    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
310        self.block_cache = Cache::new_lru_cache(capacity_bytes);
311        self
312    }
313
314    /// Sets read-only mode.
315    ///
316    /// Note: Write operations on a read-only provider will panic at runtime.
317    pub const fn with_read_only(mut self, read_only: bool) -> Self {
318        self.read_only = read_only;
319        self
320    }
321
322    /// Builds the [`RocksDBProvider`].
323    pub fn build(self) -> ProviderResult<RocksDBProvider> {
324        let options =
325            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
326
327        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
328            .column_families
329            .iter()
330            .map(|name| {
331                let cf_options = if name == tables::TransactionHashNumbers::NAME {
332                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
333                } else {
334                    Self::default_column_family_options(&self.block_cache)
335                };
336                ColumnFamilyDescriptor::new(name.clone(), cf_options)
337            })
338            .collect();
339
340        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
341
342        if self.read_only {
343            let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
344                .map_err(|e| {
345                    ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
346                        message: e.to_string().into(),
347                        code: -1,
348                    }))
349                })?;
350            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
351        } else {
352            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
353            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
354            // detection at commit) and is backed by DBCommon, giving us access to
355            // cancel_all_background_work for clean shutdown.
356            let db =
357                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
358                    .map_err(|e| {
359                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
360                            message: e.to_string().into(),
361                            code: -1,
362                        }))
363                    })?;
364            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
365        }
366    }
367}
368
369/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
370/// allocated buffer when we can just use their reference.
371macro_rules! compress_to_buf_or_ref {
372    ($buf:expr, $value:expr) => {
373        if let Some(value) = $value.uncompressable_ref() {
374            Some(value)
375        } else {
376            $buf.clear();
377            $value.compress_to_buf(&mut $buf);
378            None
379        }
380    };
381}
382
383/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
384#[derive(Debug)]
385pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
386
387/// Inner state for `RocksDB` provider.
388enum RocksDBProviderInner {
389    /// Read-write mode using `OptimisticTransactionDB`.
390    ReadWrite {
391        /// `RocksDB` database instance with optimistic transaction support.
392        db: OptimisticTransactionDB,
393        /// Metrics latency & operations.
394        metrics: Option<RocksDBMetrics>,
395    },
396    /// Read-only mode using `DB` opened with `open_cf_descriptors_read_only`.
397    /// This doesn't acquire an exclusive lock, allowing concurrent reads.
398    ReadOnly {
399        /// Read-only `RocksDB` database instance.
400        db: DB,
401        /// Metrics latency & operations.
402        metrics: Option<RocksDBMetrics>,
403    },
404}
405
406impl RocksDBProviderInner {
407    /// Returns the metrics for this provider.
408    const fn metrics(&self) -> Option<&RocksDBMetrics> {
409        match self {
410            Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
411        }
412    }
413
414    /// Returns the read-write database, panicking if in read-only mode.
415    fn db_rw(&self) -> &OptimisticTransactionDB {
416        match self {
417            Self::ReadWrite { db, .. } => db,
418            Self::ReadOnly { .. } => {
419                panic!("Cannot perform write operation on read-only RocksDB provider")
420            }
421        }
422    }
423
424    /// Gets the column family handle for a table.
425    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
426        let cf = match self {
427            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
428            Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
429        };
430        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
431    }
432
433    /// Gets the column family handle for a table from the read-write database.
434    ///
435    /// # Panics
436    /// Panics if in read-only mode.
437    fn cf_handle_rw(&self, name: &str) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
438        self.db_rw()
439            .cf_handle(name)
440            .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", name)))
441    }
442
443    /// Gets a value from a column family.
444    fn get_cf(
445        &self,
446        cf: &rocksdb::ColumnFamily,
447        key: impl AsRef<[u8]>,
448    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
449        match self {
450            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
451            Self::ReadOnly { db, .. } => db.get_cf(cf, key),
452        }
453    }
454
455    /// Puts a value into a column family.
456    fn put_cf(
457        &self,
458        cf: &rocksdb::ColumnFamily,
459        key: impl AsRef<[u8]>,
460        value: impl AsRef<[u8]>,
461    ) -> Result<(), rocksdb::Error> {
462        self.db_rw().put_cf(cf, key, value)
463    }
464
465    /// Deletes a value from a column family.
466    fn delete_cf(
467        &self,
468        cf: &rocksdb::ColumnFamily,
469        key: impl AsRef<[u8]>,
470    ) -> Result<(), rocksdb::Error> {
471        self.db_rw().delete_cf(cf, key)
472    }
473
474    /// Deletes a range of values from a column family.
475    fn delete_range_cf<K: AsRef<[u8]>>(
476        &self,
477        cf: &rocksdb::ColumnFamily,
478        from: K,
479        to: K,
480    ) -> Result<(), rocksdb::Error> {
481        self.db_rw().delete_range_cf(cf, from, to)
482    }
483
484    /// Returns an iterator over a column family.
485    fn iterator_cf(
486        &self,
487        cf: &rocksdb::ColumnFamily,
488        mode: IteratorMode<'_>,
489    ) -> RocksDBIterEnum<'_> {
490        match self {
491            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
492            Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
493        }
494    }
495
496    /// Returns a raw iterator over a column family.
497    ///
498    /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
499    /// repositioning without creating a new iterator.
500    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
501        match self {
502            Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
503            Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
504        }
505    }
506
507    /// Returns the path to the database directory.
508    fn path(&self) -> &Path {
509        match self {
510            Self::ReadWrite { db, .. } => db.path(),
511            Self::ReadOnly { db, .. } => db.path(),
512        }
513    }
514
515    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
516    ///
517    /// WAL files have a `.log` extension in the `RocksDB` directory.
518    fn wal_size_bytes(&self) -> u64 {
519        let path = self.path();
520
521        match std::fs::read_dir(path) {
522            Ok(entries) => entries
523                .filter_map(|e| e.ok())
524                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
525                .filter_map(|e| e.metadata().ok())
526                .map(|m| m.len())
527                .sum(),
528            Err(_) => 0,
529        }
530    }
531
532    /// Returns statistics for all column families in the database.
533    fn table_stats(&self) -> Vec<RocksDBTableStats> {
534        let mut stats = Vec::new();
535
536        macro_rules! collect_stats {
537            ($db:expr) => {
538                for cf_name in ROCKSDB_TABLES {
539                    if let Some(cf) = $db.cf_handle(cf_name) {
540                        let estimated_num_keys = $db
541                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
542                            .ok()
543                            .flatten()
544                            .unwrap_or(0);
545
546                        // SST files size (on-disk) + memtable size (in-memory)
547                        let sst_size = $db
548                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
549                            .ok()
550                            .flatten()
551                            .unwrap_or(0);
552
553                        let memtable_size = $db
554                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
555                            .ok()
556                            .flatten()
557                            .unwrap_or(0);
558
559                        let estimated_size_bytes = sst_size + memtable_size;
560
561                        let pending_compaction_bytes = $db
562                            .property_int_value_cf(
563                                cf,
564                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
565                            )
566                            .ok()
567                            .flatten()
568                            .unwrap_or(0);
569
570                        stats.push(RocksDBTableStats {
571                            sst_size_bytes: sst_size,
572                            memtable_size_bytes: memtable_size,
573                            name: cf_name.to_string(),
574                            estimated_num_keys,
575                            estimated_size_bytes,
576                            pending_compaction_bytes,
577                        });
578                    }
579                }
580            };
581        }
582
583        match self {
584            Self::ReadWrite { db, .. } => collect_stats!(db),
585            Self::ReadOnly { db, .. } => collect_stats!(db),
586        }
587
588        stats
589    }
590
591    /// Returns database-level statistics including per-table stats and WAL size.
592    fn db_stats(&self) -> RocksDBStats {
593        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
594    }
595}
596
597impl fmt::Debug for RocksDBProviderInner {
598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599        match self {
600            Self::ReadWrite { metrics, .. } => f
601                .debug_struct("RocksDBProviderInner::ReadWrite")
602                .field("db", &"<OptimisticTransactionDB>")
603                .field("metrics", metrics)
604                .finish(),
605            Self::ReadOnly { metrics, .. } => f
606                .debug_struct("RocksDBProviderInner::ReadOnly")
607                .field("db", &"<DB (read-only)>")
608                .field("metrics", metrics)
609                .finish(),
610        }
611    }
612}
613
614impl Drop for RocksDBProviderInner {
615    fn drop(&mut self) {
616        match self {
617            Self::ReadWrite { db, .. } => {
618                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
619                // restart
620                if let Err(e) = db.flush_wal(true) {
621                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
622                }
623                for cf_name in ROCKSDB_TABLES {
624                    if let Some(cf) = db.cf_handle(cf_name) &&
625                        let Err(e) = db.flush_cf(&cf)
626                    {
627                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
628                    }
629                }
630                db.cancel_all_background_work(true);
631            }
632            Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
633        }
634    }
635}
636
637impl Clone for RocksDBProvider {
638    fn clone(&self) -> Self {
639        Self(self.0.clone())
640    }
641}
642
643impl DatabaseMetrics for RocksDBProvider {
644    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
645        let mut metrics = Vec::new();
646
647        for stat in self.table_stats() {
648            metrics.push((
649                "rocksdb.table_size",
650                stat.estimated_size_bytes as f64,
651                vec![Label::new("table", stat.name.clone())],
652            ));
653            metrics.push((
654                "rocksdb.table_entries",
655                stat.estimated_num_keys as f64,
656                vec![Label::new("table", stat.name.clone())],
657            ));
658            metrics.push((
659                "rocksdb.pending_compaction_bytes",
660                stat.pending_compaction_bytes as f64,
661                vec![Label::new("table", stat.name.clone())],
662            ));
663            metrics.push((
664                "rocksdb.sst_size",
665                stat.sst_size_bytes as f64,
666                vec![Label::new("table", stat.name.clone())],
667            ));
668            metrics.push((
669                "rocksdb.memtable_size",
670                stat.memtable_size_bytes as f64,
671                vec![Label::new("table", stat.name)],
672            ));
673        }
674
675        // WAL size (DB-level, shared across all tables)
676        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
677
678        metrics
679    }
680}
681
682impl RocksDBProvider {
683    /// Creates a new `RocksDB` provider.
684    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
685        RocksDBBuilder::new(path).build()
686    }
687
688    /// Creates a new `RocksDB` provider builder.
689    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
690        RocksDBBuilder::new(path)
691    }
692
693    /// Returns `true` if a `RocksDB` database exists at the given path.
694    ///
695    /// Checks for the presence of the `CURRENT` file, which `RocksDB` creates
696    /// when initializing a database.
697    pub fn exists(path: impl AsRef<Path>) -> bool {
698        path.as_ref().join("CURRENT").exists()
699    }
700
701    /// Returns `true` if this provider is in read-only mode.
702    pub fn is_read_only(&self) -> bool {
703        matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
704    }
705
706    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
707    ///
708    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
709    /// Conflict detection happens at commit time, not at write time.
710    ///
711    /// # Panics
712    /// Panics if the provider is in read-only mode.
713    pub fn tx(&self) -> RocksTx<'_> {
714        let write_options = WriteOptions::default();
715        let txn_options = OptimisticTransactionOptions::default();
716        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
717        RocksTx { inner, provider: self }
718    }
719
720    /// Creates a new batch for atomic writes.
721    ///
722    /// Use [`Self::write_batch`] for closure-based atomic writes.
723    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
724    ///
725    /// # Panics
726    /// Panics if the provider is in read-only mode when attempting to commit.
727    pub fn batch(&self) -> RocksDBBatch<'_> {
728        RocksDBBatch {
729            provider: self,
730            inner: WriteBatchWithTransaction::<true>::default(),
731            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
732            auto_commit_threshold: None,
733        }
734    }
735
736    /// Creates a new batch with auto-commit enabled.
737    ///
738    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
739    /// committed and reset. This prevents OOM during large bulk writes while maintaining
740    /// crash-safety via the consistency check on startup.
741    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
742        RocksDBBatch {
743            provider: self,
744            inner: WriteBatchWithTransaction::<true>::default(),
745            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
746            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
747        }
748    }
749
750    /// Gets the column family handle for a table.
751    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
752        self.0.cf_handle::<T>()
753    }
754
755    /// Executes a function and records metrics with the given operation and table name.
756    fn execute_with_operation_metric<R>(
757        &self,
758        operation: RocksDBOperation,
759        table: &'static str,
760        f: impl FnOnce(&Self) -> R,
761    ) -> R {
762        let start = self.0.metrics().map(|_| Instant::now());
763        let res = f(self);
764
765        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
766            metrics.record_operation(operation, table, start.elapsed());
767        }
768
769        res
770    }
771
772    /// Gets a value from the specified table.
773    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
774        self.get_encoded::<T>(&key.encode())
775    }
776
777    /// Gets a value from the specified table using pre-encoded key.
778    pub fn get_encoded<T: Table>(
779        &self,
780        key: &<T::Key as Encode>::Encoded,
781    ) -> ProviderResult<Option<T::Value>> {
782        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
783            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
784                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
785                    message: e.to_string().into(),
786                    code: -1,
787                }))
788            })?;
789
790            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
791        })
792    }
793
794    /// Puts upsert a value into the specified table with the given key.
795    ///
796    /// # Panics
797    /// Panics if the provider is in read-only mode.
798    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
799        let encoded_key = key.encode();
800        self.put_encoded::<T>(&encoded_key, value)
801    }
802
803    /// Puts a value into the specified table using pre-encoded key.
804    ///
805    /// # Panics
806    /// Panics if the provider is in read-only mode.
807    pub fn put_encoded<T: Table>(
808        &self,
809        key: &<T::Key as Encode>::Encoded,
810        value: &T::Value,
811    ) -> ProviderResult<()> {
812        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
813            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
814            // is thread safe if user want to avoid allocate buf each time, they can use
815            // write_batch api
816            let mut buf = Vec::new();
817            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
818
819            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
820                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
821                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
822                    operation: DatabaseWriteOperation::PutUpsert,
823                    table_name: T::NAME,
824                    key: key.as_ref().to_vec(),
825                })))
826            })
827        })
828    }
829
830    /// Deletes a value from the specified table.
831    ///
832    /// # Panics
833    /// Panics if the provider is in read-only mode.
834    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
835        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
836            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
837                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
838                    message: e.to_string().into(),
839                    code: -1,
840                }))
841            })
842        })
843    }
844
845    /// Clears all entries from the specified table.
846    ///
847    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
848    /// This end key must exceed the maximum encoded key size for any table.
849    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
850    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
851        let cf = self.get_cf_handle::<T>()?;
852
853        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
854            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
855                message: e.to_string().into(),
856                code: -1,
857            }))
858        })?;
859
860        Ok(())
861    }
862
863    /// Retrieves the first or last entry from a table based on the iterator mode.
864    fn get_boundary<T: Table>(
865        &self,
866        mode: IteratorMode<'_>,
867    ) -> ProviderResult<Option<(T::Key, T::Value)>> {
868        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
869            let cf = this.get_cf_handle::<T>()?;
870            let mut iter = this.0.iterator_cf(cf, mode);
871
872            match iter.next() {
873                Some(Ok((key_bytes, value_bytes))) => {
874                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
875                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
876                    let value = T::Value::decompress(&value_bytes)
877                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
878                    Ok(Some((key, value)))
879                }
880                Some(Err(e)) => {
881                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
882                        message: e.to_string().into(),
883                        code: -1,
884                    })))
885                }
886                None => Ok(None),
887            }
888        })
889    }
890
891    /// Gets the first (smallest key) entry from the specified table.
892    #[inline]
893    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
894        self.get_boundary::<T>(IteratorMode::Start)
895    }
896
897    /// Gets the last (largest key) entry from the specified table.
898    #[inline]
899    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
900        self.get_boundary::<T>(IteratorMode::End)
901    }
902
903    /// Creates an iterator over all entries in the specified table.
904    ///
905    /// Returns decoded `(Key, Value)` pairs in key order.
906    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
907        let cf = self.get_cf_handle::<T>()?;
908        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
909        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
910    }
911
912    /// Returns statistics for all column families in the database.
913    ///
914    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
915    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
916        self.0.table_stats()
917    }
918
919    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
920    ///
921    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
922    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
923    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
924    pub fn wal_size_bytes(&self) -> u64 {
925        self.0.wal_size_bytes()
926    }
927
928    /// Returns database-level statistics including per-table stats and WAL size.
929    ///
930    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
931    pub fn db_stats(&self) -> RocksDBStats {
932        self.0.db_stats()
933    }
934
935    /// Flushes pending writes for the specified tables to disk.
936    ///
937    /// This performs a flush of:
938    /// 1. The column family memtables for the specified table names to SST files
939    /// 2. The Write-Ahead Log (WAL) with sync
940    ///
941    /// After this call completes, all data for the specified tables is durably persisted to disk.
942    ///
943    /// # Panics
944    /// Panics if the provider is in read-only mode.
945    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
946    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
947        let db = self.0.db_rw();
948
949        for cf_name in tables {
950            if let Some(cf) = db.cf_handle(cf_name) {
951                db.flush_cf(&cf).map_err(|e| {
952                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
953                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
954                        operation: DatabaseWriteOperation::Flush,
955                        table_name: cf_name,
956                        key: Vec::new(),
957                    })))
958                })?;
959            }
960        }
961
962        db.flush_wal(true).map_err(|e| {
963            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
964                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
965                operation: DatabaseWriteOperation::Flush,
966                table_name: "WAL",
967                key: Vec::new(),
968            })))
969        })?;
970
971        Ok(())
972    }
973
974    /// Flushes and compacts all tables in `RocksDB`.
975    ///
976    /// This:
977    /// 1. Flushes all column family memtables to SST files
978    /// 2. Flushes the Write-Ahead Log (WAL) with sync
979    /// 3. Triggers manual compaction on all column families to reclaim disk space
980    ///
981    /// Use this after large delete operations (like pruning) to reclaim disk space.
982    ///
983    /// # Panics
984    /// Panics if the provider is in read-only mode.
985    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
986    pub fn flush_and_compact(&self) -> ProviderResult<()> {
987        self.flush(ROCKSDB_TABLES)?;
988
989        let db = self.0.db_rw();
990
991        for cf_name in ROCKSDB_TABLES {
992            if let Some(cf) = db.cf_handle(cf_name) {
993                db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
994            }
995        }
996
997        Ok(())
998    }
999
1000    /// Creates a raw iterator over all entries in the specified table.
1001    ///
1002    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
1003    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1004        let cf = self.get_cf_handle::<T>()?;
1005        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1006        Ok(RocksDBRawIter { inner: iter })
1007    }
1008
1009    /// Returns all account history shards for the given address in ascending key order.
1010    ///
1011    /// This is used for unwind operations where we need to scan all shards for an address
1012    /// and potentially delete or truncate them.
1013    pub fn account_history_shards(
1014        &self,
1015        address: Address,
1016    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1017        // Get the column family handle for the AccountsHistory table.
1018        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1019
1020        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
1021        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
1022        let start_key = ShardedKey::new(address, 0u64);
1023        let start_bytes = start_key.encode();
1024
1025        // Create a forward iterator starting from our seek position.
1026        let iter = self
1027            .0
1028            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1029
1030        let mut result = Vec::new();
1031        for item in iter {
1032            match item {
1033                Ok((key_bytes, value_bytes)) => {
1034                    // Decode the sharded key to check if we're still on the same address.
1035                    let key = ShardedKey::<Address>::decode(&key_bytes)
1036                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1037
1038                    // Stop when we reach a different address (keys are sorted by address first).
1039                    if key.key != address {
1040                        break;
1041                    }
1042
1043                    // Decompress the block number list stored in this shard.
1044                    let value = BlockNumberList::decompress(&value_bytes)
1045                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1046
1047                    result.push((key, value));
1048                }
1049                Err(e) => {
1050                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1051                        message: e.to_string().into(),
1052                        code: -1,
1053                    })));
1054                }
1055            }
1056        }
1057
1058        Ok(result)
1059    }
1060
1061    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1062    ///
1063    /// Iterates through all shards in ascending `highest_block_number` order until
1064    /// a different `(address, storage_key)` is encountered.
1065    pub fn storage_history_shards(
1066        &self,
1067        address: Address,
1068        storage_key: B256,
1069    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1070        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1071
1072        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1073        let start_bytes = start_key.encode();
1074
1075        let iter = self
1076            .0
1077            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1078
1079        let mut result = Vec::new();
1080        for item in iter {
1081            match item {
1082                Ok((key_bytes, value_bytes)) => {
1083                    let key = StorageShardedKey::decode(&key_bytes)
1084                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1085
1086                    if key.address != address || key.sharded_key.key != storage_key {
1087                        break;
1088                    }
1089
1090                    let value = BlockNumberList::decompress(&value_bytes)
1091                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1092
1093                    result.push((key, value));
1094                }
1095                Err(e) => {
1096                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1097                        message: e.to_string().into(),
1098                        code: -1,
1099                    })));
1100                }
1101            }
1102        }
1103
1104        Ok(result)
1105    }
1106
1107    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1108    ///
1109    /// Groups addresses by their minimum block number and calls the appropriate unwind
1110    /// operations. For each address, keeps only blocks less than the minimum block
1111    /// (i.e., removes the minimum block and all higher blocks).
1112    ///
1113    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1114    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1115    pub fn unwind_account_history_indices(
1116        &self,
1117        last_indices: &[(Address, BlockNumber)],
1118    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1119        let mut address_min_block: AddressMap<BlockNumber> =
1120            AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1121        for &(address, block_number) in last_indices {
1122            address_min_block
1123                .entry(address)
1124                .and_modify(|min| *min = (*min).min(block_number))
1125                .or_insert(block_number);
1126        }
1127
1128        let mut batch = self.batch();
1129        for (address, min_block) in address_min_block {
1130            match min_block.checked_sub(1) {
1131                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1132                None => batch.clear_account_history(address)?,
1133            }
1134        }
1135
1136        Ok(batch.into_inner())
1137    }
1138
1139    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1140    ///
1141    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1142    /// For each key, keeps only blocks less than the minimum block
1143    /// (i.e., removes the minimum block and all higher blocks).
1144    ///
1145    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1146    pub fn unwind_storage_history_indices(
1147        &self,
1148        storage_changesets: &[(Address, B256, BlockNumber)],
1149    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1150        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1151            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1152        for &(address, storage_key, block_number) in storage_changesets {
1153            key_min_block
1154                .entry((address, storage_key))
1155                .and_modify(|min| *min = (*min).min(block_number))
1156                .or_insert(block_number);
1157        }
1158
1159        let mut batch = self.batch();
1160        for ((address, storage_key), min_block) in key_min_block {
1161            match min_block.checked_sub(1) {
1162                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1163                None => batch.clear_storage_history(address, storage_key)?,
1164            }
1165        }
1166
1167        Ok(batch.into_inner())
1168    }
1169
1170    /// Writes a batch of operations atomically.
1171    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1172    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1173    where
1174        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1175    {
1176        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1177            let mut batch_handle = this.batch();
1178            f(&mut batch_handle)?;
1179            batch_handle.commit()
1180        })
1181    }
1182
1183    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1184    ///
1185    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1186    /// and needs to be committed at a later point (e.g., at provider commit time).
1187    ///
1188    /// # Panics
1189    /// Panics if the provider is in read-only mode.
1190    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1191    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1192        self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1193            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1194                message: e.to_string().into(),
1195                code: -1,
1196            }))
1197        })
1198    }
1199
1200    /// Writes all `RocksDB` data for multiple blocks in parallel.
1201    ///
1202    /// This handles transaction hash numbers, account history, and storage history based on
1203    /// the provided storage settings. Each operation runs in parallel with its own batch,
1204    /// pushing to `ctx.pending_batches` for later commit.
1205    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1206    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1207        &self,
1208        blocks: &[ExecutedBlock<N>],
1209        tx_nums: &[TxNumber],
1210        ctx: RocksDBWriteCtx,
1211        runtime: &reth_tasks::Runtime,
1212    ) -> ProviderResult<()> {
1213        if !ctx.storage_settings.storage_v2 {
1214            return Ok(());
1215        }
1216
1217        let mut r_tx_hash = None;
1218        let mut r_account_history = None;
1219        let mut r_storage_history = None;
1220
1221        let write_tx_hash =
1222            ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1223        let write_account_history = ctx.storage_settings.storage_v2;
1224        let write_storage_history = ctx.storage_settings.storage_v2;
1225
1226        // Propagate tracing context into rayon-spawned threads so that RocksDB
1227        // write spans appear as children of write_blocks_data in traces.
1228        let span = tracing::Span::current();
1229        runtime.storage_pool().in_place_scope(|s| {
1230            if write_tx_hash {
1231                s.spawn(|_| {
1232                    let _guard = span.enter();
1233                    r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1234                });
1235            }
1236
1237            if write_account_history {
1238                s.spawn(|_| {
1239                    let _guard = span.enter();
1240                    r_account_history = Some(self.write_account_history(blocks, &ctx));
1241                });
1242            }
1243
1244            if write_storage_history {
1245                s.spawn(|_| {
1246                    let _guard = span.enter();
1247                    r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1248                });
1249            }
1250        });
1251
1252        if write_tx_hash {
1253            r_tx_hash.ok_or_else(|| {
1254                ProviderError::Database(DatabaseError::Other(
1255                    "rocksdb tx-hash write thread panicked".into(),
1256                ))
1257            })??;
1258        }
1259        if write_account_history {
1260            r_account_history.ok_or_else(|| {
1261                ProviderError::Database(DatabaseError::Other(
1262                    "rocksdb account-history write thread panicked".into(),
1263                ))
1264            })??;
1265        }
1266        if write_storage_history {
1267            r_storage_history.ok_or_else(|| {
1268                ProviderError::Database(DatabaseError::Other(
1269                    "rocksdb storage-history write thread panicked".into(),
1270                ))
1271            })??;
1272        }
1273
1274        Ok(())
1275    }
1276
1277    /// Writes transaction hash to number mappings for the given blocks.
1278    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1279    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1280        &self,
1281        blocks: &[ExecutedBlock<N>],
1282        tx_nums: &[TxNumber],
1283        ctx: &RocksDBWriteCtx,
1284    ) -> ProviderResult<()> {
1285        let mut batch = self.batch();
1286        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1287            let body = block.recovered_block().body();
1288            for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1289                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1290            }
1291        }
1292        ctx.pending_batches.lock().push(batch.into_inner());
1293        Ok(())
1294    }
1295
1296    /// Writes account history indices for the given blocks.
1297    ///
1298    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1299    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1300    fn write_account_history<N: reth_node_types::NodePrimitives>(
1301        &self,
1302        blocks: &[ExecutedBlock<N>],
1303        ctx: &RocksDBWriteCtx,
1304    ) -> ProviderResult<()> {
1305        let mut batch = self.batch();
1306        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1307
1308        for (block_idx, block) in blocks.iter().enumerate() {
1309            let block_number = ctx.first_block_number + block_idx as u64;
1310            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1311
1312            // Iterate through account reverts - these are exactly the accounts that have
1313            // changesets written, ensuring history indices match changeset entries.
1314            for account_block_reverts in reverts.accounts {
1315                for (address, _) in account_block_reverts {
1316                    account_history.entry(address).or_default().push(block_number);
1317                }
1318            }
1319        }
1320
1321        // Write account history using proper shard append logic
1322        for (address, indices) in account_history {
1323            batch.append_account_history_shard(address, indices)?;
1324        }
1325        ctx.pending_batches.lock().push(batch.into_inner());
1326        Ok(())
1327    }
1328
1329    /// Writes storage history indices for the given blocks.
1330    ///
1331    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1332    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1333    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1334        &self,
1335        blocks: &[ExecutedBlock<N>],
1336        ctx: &RocksDBWriteCtx,
1337    ) -> ProviderResult<()> {
1338        let mut batch = self.batch();
1339        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1340
1341        for (block_idx, block) in blocks.iter().enumerate() {
1342            let block_number = ctx.first_block_number + block_idx as u64;
1343            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1344
1345            // Iterate through storage reverts - these are exactly the slots that have
1346            // changesets written, ensuring history indices match changeset entries.
1347            for storage_block_reverts in reverts.storage {
1348                for revert in storage_block_reverts {
1349                    for (slot, _) in revert.storage_revert {
1350                        let plain_key = B256::new(slot.to_be_bytes());
1351                        storage_history
1352                            .entry((revert.address, plain_key))
1353                            .or_default()
1354                            .push(block_number);
1355                    }
1356                }
1357            }
1358        }
1359
1360        // Write storage history using proper shard append logic
1361        for ((address, slot), indices) in storage_history {
1362            batch.append_storage_history_shard(address, slot, indices)?;
1363        }
1364        ctx.pending_batches.lock().push(batch.into_inner());
1365        Ok(())
1366    }
1367}
1368
1369/// Outcome of pruning a history shard in `RocksDB`.
1370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1371pub enum PruneShardOutcome {
1372    /// Shard was deleted entirely.
1373    Deleted,
1374    /// Shard was updated with filtered block numbers.
1375    Updated,
1376    /// Shard was unchanged (no blocks <= `to_block`).
1377    Unchanged,
1378}
1379
1380/// Tracks pruning outcomes for batch operations.
1381#[derive(Debug, Default, Clone, Copy)]
1382pub struct PrunedIndices {
1383    /// Number of shards completely deleted.
1384    pub deleted: usize,
1385    /// Number of shards that were updated (filtered but still have entries).
1386    pub updated: usize,
1387    /// Number of shards that were unchanged.
1388    pub unchanged: usize,
1389}
1390
1391/// Handle for building a batch of operations atomically.
1392///
1393/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1394/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1395/// where you don't need to read back uncommitted data within the same operation
1396/// (e.g., history index writes).
1397///
1398/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1399/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1400#[must_use = "batch must be committed"]
1401pub struct RocksDBBatch<'a> {
1402    provider: &'a RocksDBProvider,
1403    inner: WriteBatchWithTransaction<true>,
1404    buf: Vec<u8>,
1405    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1406    auto_commit_threshold: Option<usize>,
1407}
1408
1409impl fmt::Debug for RocksDBBatch<'_> {
1410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1411        f.debug_struct("RocksDBBatch")
1412            .field("provider", &self.provider)
1413            .field("batch", &"<WriteBatchWithTransaction>")
1414            // Number of operations in this batch
1415            .field("length", &self.inner.len())
1416            // Total serialized size (encoded key + compressed value + metadata) of this batch
1417            // in bytes
1418            .field("size_in_bytes", &self.inner.size_in_bytes())
1419            .finish()
1420    }
1421}
1422
1423impl<'a> RocksDBBatch<'a> {
1424    /// Puts a value into the batch.
1425    ///
1426    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1427    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1428        let encoded_key = key.encode();
1429        self.put_encoded::<T>(&encoded_key, value)
1430    }
1431
1432    /// Puts a value into the batch using pre-encoded key.
1433    ///
1434    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1435    pub fn put_encoded<T: Table>(
1436        &mut self,
1437        key: &<T::Key as Encode>::Encoded,
1438        value: &T::Value,
1439    ) -> ProviderResult<()> {
1440        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1441        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1442        self.maybe_auto_commit()?;
1443        Ok(())
1444    }
1445
1446    /// Deletes a value from the batch.
1447    ///
1448    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1449    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1450        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1451        self.maybe_auto_commit()?;
1452        Ok(())
1453    }
1454
1455    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1456    ///
1457    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1458    /// Returns immediately if auto-commit is disabled or threshold not reached.
1459    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1460        if let Some(threshold) = self.auto_commit_threshold &&
1461            self.inner.size_in_bytes() >= threshold
1462        {
1463            tracing::debug!(
1464                target: "providers::rocksdb",
1465                batch_size = self.inner.size_in_bytes(),
1466                threshold,
1467                "Auto-committing RocksDB batch"
1468            );
1469            let old_batch = std::mem::take(&mut self.inner);
1470            self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1471                |e| {
1472                    ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1473                        message: e.to_string().into(),
1474                        code: -1,
1475                    }))
1476                },
1477            )?;
1478        }
1479        Ok(())
1480    }
1481
1482    /// Commits the batch to the database.
1483    ///
1484    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1485    ///
1486    /// # Panics
1487    /// Panics if the provider is in read-only mode.
1488    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1489    pub fn commit(self) -> ProviderResult<()> {
1490        self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1491            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1492                message: e.to_string().into(),
1493                code: -1,
1494            }))
1495        })
1496    }
1497
1498    /// Returns the number of write operations (puts + deletes) queued in this batch.
1499    pub fn len(&self) -> usize {
1500        self.inner.len()
1501    }
1502
1503    /// Returns `true` if the batch contains no operations.
1504    pub fn is_empty(&self) -> bool {
1505        self.inner.is_empty()
1506    }
1507
1508    /// Returns the size of the batch in bytes.
1509    pub fn size_in_bytes(&self) -> usize {
1510        self.inner.size_in_bytes()
1511    }
1512
1513    /// Returns a reference to the underlying `RocksDB` provider.
1514    pub const fn provider(&self) -> &RocksDBProvider {
1515        self.provider
1516    }
1517
1518    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1519    ///
1520    /// This is used to defer commits to the provider level.
1521    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1522        self.inner
1523    }
1524
1525    /// Gets a value from the database.
1526    ///
1527    /// **Important constraint:** This reads only committed state, not pending writes in this
1528    /// batch or other pending batches in `pending_rocksdb_batches`.
1529    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1530        self.provider.get::<T>(key)
1531    }
1532
1533    /// Appends indices to an account history shard with proper shard management.
1534    ///
1535    /// Loads the existing shard (if any), appends new indices, and rechunks into
1536    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1537    ///
1538    /// # Requirements
1539    ///
1540    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1541    /// - This method MUST only be called once per address per batch. The batch reads existing
1542    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1543    ///   address will cause the second call to overwrite the first.
1544    pub fn append_account_history_shard(
1545        &mut self,
1546        address: Address,
1547        indices: impl IntoIterator<Item = u64>,
1548    ) -> ProviderResult<()> {
1549        let indices: Vec<u64> = indices.into_iter().collect();
1550
1551        if indices.is_empty() {
1552            return Ok(());
1553        }
1554
1555        debug_assert!(
1556            indices.windows(2).all(|w| w[0] < w[1]),
1557            "indices must be strictly increasing: {:?}",
1558            indices
1559        );
1560
1561        let last_key = ShardedKey::new(address, u64::MAX);
1562        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1563        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1564
1565        last_shard.append(indices).map_err(ProviderError::other)?;
1566
1567        // Fast path: all indices fit in one shard
1568        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1569            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1570            return Ok(());
1571        }
1572
1573        // Slow path: rechunk into multiple shards
1574        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1575        let mut chunks_peekable = chunks.into_iter().peekable();
1576
1577        while let Some(chunk) = chunks_peekable.next() {
1578            let shard = BlockNumberList::new_pre_sorted(chunk);
1579            let highest_block_number = if chunks_peekable.peek().is_some() {
1580                shard.iter().next_back().expect("`chunks` does not return empty list")
1581            } else {
1582                u64::MAX
1583            };
1584
1585            self.put::<tables::AccountsHistory>(
1586                ShardedKey::new(address, highest_block_number),
1587                &shard,
1588            )?;
1589        }
1590
1591        Ok(())
1592    }
1593
1594    /// Appends indices to a storage history shard with proper shard management.
1595    ///
1596    /// Loads the existing shard (if any), appends new indices, and rechunks into
1597    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1598    ///
1599    /// # Requirements
1600    ///
1601    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1602    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1603    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1604    ///   twice for the same key will cause the second call to overwrite the first.
1605    pub fn append_storage_history_shard(
1606        &mut self,
1607        address: Address,
1608        storage_key: B256,
1609        indices: impl IntoIterator<Item = u64>,
1610    ) -> ProviderResult<()> {
1611        let indices: Vec<u64> = indices.into_iter().collect();
1612
1613        if indices.is_empty() {
1614            return Ok(());
1615        }
1616
1617        debug_assert!(
1618            indices.windows(2).all(|w| w[0] < w[1]),
1619            "indices must be strictly increasing: {:?}",
1620            indices
1621        );
1622
1623        let last_key = StorageShardedKey::last(address, storage_key);
1624        let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1625        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1626
1627        last_shard.append(indices).map_err(ProviderError::other)?;
1628
1629        // Fast path: all indices fit in one shard
1630        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1631            self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1632            return Ok(());
1633        }
1634
1635        // Slow path: rechunk into multiple shards
1636        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1637        let mut chunks_peekable = chunks.into_iter().peekable();
1638
1639        while let Some(chunk) = chunks_peekable.next() {
1640            let shard = BlockNumberList::new_pre_sorted(chunk);
1641            let highest_block_number = if chunks_peekable.peek().is_some() {
1642                shard.iter().next_back().expect("`chunks` does not return empty list")
1643            } else {
1644                u64::MAX
1645            };
1646
1647            self.put::<tables::StoragesHistory>(
1648                StorageShardedKey::new(address, storage_key, highest_block_number),
1649                &shard,
1650            )?;
1651        }
1652
1653        Ok(())
1654    }
1655
1656    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1657    ///
1658    /// Mirrors MDBX `unwind_history_shards` behavior:
1659    /// - Deletes shards entirely above `keep_to`
1660    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1661    /// - Preserves shards entirely below `keep_to`
1662    pub fn unwind_account_history_to(
1663        &mut self,
1664        address: Address,
1665        keep_to: BlockNumber,
1666    ) -> ProviderResult<()> {
1667        let shards = self.provider.account_history_shards(address)?;
1668        if shards.is_empty() {
1669            return Ok(());
1670        }
1671
1672        // Find the first shard that might contain blocks > keep_to.
1673        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1674        let boundary_idx = shards.iter().position(|(key, _)| {
1675            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1676        });
1677
1678        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1679        let Some(boundary_idx) = boundary_idx else {
1680            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1681            if last_key.highest_block_number != u64::MAX {
1682                self.delete::<tables::AccountsHistory>(last_key.clone())?;
1683                self.put::<tables::AccountsHistory>(
1684                    ShardedKey::new(address, u64::MAX),
1685                    last_value,
1686                )?;
1687            }
1688            return Ok(());
1689        };
1690
1691        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1692        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1693            self.delete::<tables::AccountsHistory>(key.clone())?;
1694        }
1695
1696        // Process the boundary shard: filter out blocks > keep_to
1697        let (boundary_key, boundary_list) = &shards[boundary_idx];
1698
1699        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1700        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1701
1702        // Build truncated list once; check emptiness directly (avoids double iteration)
1703        let new_last =
1704            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1705
1706        if new_last.is_empty() {
1707            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
1708            // u64::MAX.
1709            if boundary_idx == 0 {
1710                // Nothing left for this address
1711                return Ok(());
1712            }
1713
1714            let (prev_key, prev_value) = &shards[boundary_idx - 1];
1715            if prev_key.highest_block_number != u64::MAX {
1716                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1717                self.put::<tables::AccountsHistory>(
1718                    ShardedKey::new(address, u64::MAX),
1719                    prev_value,
1720                )?;
1721            }
1722            return Ok(());
1723        }
1724
1725        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1726
1727        Ok(())
1728    }
1729
1730    /// Prunes history shards, removing blocks <= `to_block`.
1731    ///
1732    /// Generic implementation for both account and storage history pruning.
1733    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1734    /// (if any) will have the sentinel key (`u64::MAX`).
1735    #[allow(clippy::too_many_arguments)]
1736    fn prune_history_shards_inner<K>(
1737        &mut self,
1738        shards: Vec<(K, BlockNumberList)>,
1739        to_block: BlockNumber,
1740        get_highest: impl Fn(&K) -> u64,
1741        is_sentinel: impl Fn(&K) -> bool,
1742        delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
1743        put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
1744        create_sentinel: impl Fn() -> K,
1745    ) -> ProviderResult<PruneShardOutcome>
1746    where
1747        K: Clone,
1748    {
1749        if shards.is_empty() {
1750            return Ok(PruneShardOutcome::Unchanged);
1751        }
1752
1753        let mut deleted = false;
1754        let mut updated = false;
1755        let mut last_remaining: Option<(K, BlockNumberList)> = None;
1756
1757        for (key, block_list) in shards {
1758            if !is_sentinel(&key) && get_highest(&key) <= to_block {
1759                delete_shard(self, key)?;
1760                deleted = true;
1761            } else {
1762                let original_len = block_list.len();
1763                let filtered =
1764                    BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
1765
1766                if filtered.is_empty() {
1767                    delete_shard(self, key)?;
1768                    deleted = true;
1769                } else if filtered.len() < original_len {
1770                    put_shard(self, key.clone(), &filtered)?;
1771                    last_remaining = Some((key, filtered));
1772                    updated = true;
1773                } else {
1774                    last_remaining = Some((key, block_list));
1775                }
1776            }
1777        }
1778
1779        if let Some((last_key, last_value)) = last_remaining &&
1780            !is_sentinel(&last_key)
1781        {
1782            delete_shard(self, last_key)?;
1783            put_shard(self, create_sentinel(), &last_value)?;
1784            updated = true;
1785        }
1786
1787        if deleted {
1788            Ok(PruneShardOutcome::Deleted)
1789        } else if updated {
1790            Ok(PruneShardOutcome::Updated)
1791        } else {
1792            Ok(PruneShardOutcome::Unchanged)
1793        }
1794    }
1795
1796    /// Prunes account history for the given address, removing blocks <= `to_block`.
1797    ///
1798    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1799    /// (if any) will have the sentinel key (`u64::MAX`).
1800    pub fn prune_account_history_to(
1801        &mut self,
1802        address: Address,
1803        to_block: BlockNumber,
1804    ) -> ProviderResult<PruneShardOutcome> {
1805        let shards = self.provider.account_history_shards(address)?;
1806        self.prune_history_shards_inner(
1807            shards,
1808            to_block,
1809            |key| key.highest_block_number,
1810            |key| key.highest_block_number == u64::MAX,
1811            |batch, key| batch.delete::<tables::AccountsHistory>(key),
1812            |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1813            || ShardedKey::new(address, u64::MAX),
1814        )
1815    }
1816
1817    /// Prunes account history for multiple addresses in a single iterator pass.
1818    ///
1819    /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
1820    /// because it reuses a single raw iterator and skips seeks when the iterator is already
1821    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
1822    ///
1823    /// `targets` MUST be sorted by address for correctness and optimal performance
1824    /// (matches on-disk key order).
1825    pub fn prune_account_history_batch(
1826        &mut self,
1827        targets: &[(Address, BlockNumber)],
1828    ) -> ProviderResult<PrunedIndices> {
1829        if targets.is_empty() {
1830            return Ok(PrunedIndices::default());
1831        }
1832
1833        debug_assert!(
1834            targets.windows(2).all(|w| w[0].0 <= w[1].0),
1835            "prune_account_history_batch: targets must be sorted by address"
1836        );
1837
1838        // ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
1839        // The first 20 bytes are the "prefix" that identifies the address
1840        const PREFIX_LEN: usize = 20;
1841
1842        let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
1843        let mut iter = self.provider.0.raw_iterator_cf(cf);
1844        let mut outcomes = PrunedIndices::default();
1845
1846        for (address, to_block) in targets {
1847            // Build the target prefix (first 20 bytes = address)
1848            let start_key = ShardedKey::new(*address, 0u64).encode();
1849            let target_prefix = &start_key[..PREFIX_LEN];
1850
1851            // Check if we need to seek or if the iterator is already positioned correctly.
1852            // After processing the previous target, the iterator is either:
1853            // 1. Positioned at a key with a different prefix (we iterated past our shards)
1854            // 2. Invalid (no more keys)
1855            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
1856            let needs_seek = if iter.valid() {
1857                if let Some(current_key) = iter.key() {
1858                    // If current key's prefix < target prefix, we need to seek forward
1859                    // If current key's prefix > target prefix, this target has no shards (skip)
1860                    // If current key's prefix == target prefix, we're already positioned
1861                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1862                } else {
1863                    true
1864                }
1865            } else {
1866                true
1867            };
1868
1869            if needs_seek {
1870                iter.seek(start_key);
1871                iter.status().map_err(|e| {
1872                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1873                        message: e.to_string().into(),
1874                        code: -1,
1875                    }))
1876                })?;
1877            }
1878
1879            // Collect all shards for this address using raw prefix comparison
1880            let mut shards = Vec::new();
1881            while iter.valid() {
1882                let Some(key_bytes) = iter.key() else { break };
1883
1884                // Use raw prefix comparison instead of full decode for the prefix check
1885                let current_prefix = key_bytes.get(..PREFIX_LEN);
1886                if current_prefix != Some(target_prefix) {
1887                    break;
1888                }
1889
1890                // Now decode the full key (we need the block number)
1891                let key = ShardedKey::<Address>::decode(key_bytes)
1892                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1893
1894                let Some(value_bytes) = iter.value() else { break };
1895                let value = BlockNumberList::decompress(value_bytes)
1896                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1897
1898                shards.push((key, value));
1899                iter.next();
1900            }
1901
1902            match self.prune_history_shards_inner(
1903                shards,
1904                *to_block,
1905                |key| key.highest_block_number,
1906                |key| key.highest_block_number == u64::MAX,
1907                |batch, key| batch.delete::<tables::AccountsHistory>(key),
1908                |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1909                || ShardedKey::new(*address, u64::MAX),
1910            )? {
1911                PruneShardOutcome::Deleted => outcomes.deleted += 1,
1912                PruneShardOutcome::Updated => outcomes.updated += 1,
1913                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
1914            }
1915        }
1916
1917        Ok(outcomes)
1918    }
1919
1920    /// Prunes storage history for the given address and storage key, removing blocks <=
1921    /// `to_block`.
1922    ///
1923    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1924    /// (if any) will have the sentinel key (`u64::MAX`).
1925    pub fn prune_storage_history_to(
1926        &mut self,
1927        address: Address,
1928        storage_key: B256,
1929        to_block: BlockNumber,
1930    ) -> ProviderResult<PruneShardOutcome> {
1931        let shards = self.provider.storage_history_shards(address, storage_key)?;
1932        self.prune_history_shards_inner(
1933            shards,
1934            to_block,
1935            |key| key.sharded_key.highest_block_number,
1936            |key| key.sharded_key.highest_block_number == u64::MAX,
1937            |batch, key| batch.delete::<tables::StoragesHistory>(key),
1938            |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
1939            || StorageShardedKey::last(address, storage_key),
1940        )
1941    }
1942
1943    /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
1944    /// pass.
1945    ///
1946    /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
1947    /// because it reuses a single raw iterator and skips seeks when the iterator is already
1948    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
1949    ///
1950    /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
1951    /// performance (matches on-disk key order).
1952    pub fn prune_storage_history_batch(
1953        &mut self,
1954        targets: &[((Address, B256), BlockNumber)],
1955    ) -> ProviderResult<PrunedIndices> {
1956        if targets.is_empty() {
1957            return Ok(PrunedIndices::default());
1958        }
1959
1960        debug_assert!(
1961            targets.windows(2).all(|w| w[0].0 <= w[1].0),
1962            "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
1963        );
1964
1965        // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
1966        // The first 52 bytes are the "prefix" that identifies (address, storage_key)
1967        const PREFIX_LEN: usize = 52;
1968
1969        let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
1970        let mut iter = self.provider.0.raw_iterator_cf(cf);
1971        let mut outcomes = PrunedIndices::default();
1972
1973        for ((address, storage_key), to_block) in targets {
1974            // Build the target prefix (first 52 bytes of encoded key)
1975            let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
1976            let target_prefix = &start_key[..PREFIX_LEN];
1977
1978            // Check if we need to seek or if the iterator is already positioned correctly.
1979            // After processing the previous target, the iterator is either:
1980            // 1. Positioned at a key with a different prefix (we iterated past our shards)
1981            // 2. Invalid (no more keys)
1982            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
1983            let needs_seek = if iter.valid() {
1984                if let Some(current_key) = iter.key() {
1985                    // If current key's prefix < target prefix, we need to seek forward
1986                    // If current key's prefix > target prefix, this target has no shards (skip)
1987                    // If current key's prefix == target prefix, we're already positioned
1988                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1989                } else {
1990                    true
1991                }
1992            } else {
1993                true
1994            };
1995
1996            if needs_seek {
1997                iter.seek(start_key);
1998                iter.status().map_err(|e| {
1999                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2000                        message: e.to_string().into(),
2001                        code: -1,
2002                    }))
2003                })?;
2004            }
2005
2006            // Collect all shards for this (address, storage_key) pair using prefix comparison
2007            let mut shards = Vec::new();
2008            while iter.valid() {
2009                let Some(key_bytes) = iter.key() else { break };
2010
2011                // Use raw prefix comparison instead of full decode for the prefix check
2012                let current_prefix = key_bytes.get(..PREFIX_LEN);
2013                if current_prefix != Some(target_prefix) {
2014                    break;
2015                }
2016
2017                // Now decode the full key (we need the block number)
2018                let key = StorageShardedKey::decode(key_bytes)
2019                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2020
2021                let Some(value_bytes) = iter.value() else { break };
2022                let value = BlockNumberList::decompress(value_bytes)
2023                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2024
2025                shards.push((key, value));
2026                iter.next();
2027            }
2028
2029            // Use existing prune_history_shards_inner logic
2030            match self.prune_history_shards_inner(
2031                shards,
2032                *to_block,
2033                |key| key.sharded_key.highest_block_number,
2034                |key| key.sharded_key.highest_block_number == u64::MAX,
2035                |batch, key| batch.delete::<tables::StoragesHistory>(key),
2036                |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2037                || StorageShardedKey::last(*address, *storage_key),
2038            )? {
2039                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2040                PruneShardOutcome::Updated => outcomes.updated += 1,
2041                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2042            }
2043        }
2044
2045        Ok(outcomes)
2046    }
2047
2048    /// Unwinds storage history to keep only blocks `<= keep_to`.
2049    ///
2050    /// Handles multi-shard scenarios by:
2051    /// 1. Loading all shards for the `(address, storage_key)` pair
2052    /// 2. Finding the boundary shard containing `keep_to`
2053    /// 3. Deleting all shards after the boundary
2054    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
2055    /// 5. Ensuring the last shard is keyed with `u64::MAX`
2056    pub fn unwind_storage_history_to(
2057        &mut self,
2058        address: Address,
2059        storage_key: B256,
2060        keep_to: BlockNumber,
2061    ) -> ProviderResult<()> {
2062        let shards = self.provider.storage_history_shards(address, storage_key)?;
2063        if shards.is_empty() {
2064            return Ok(());
2065        }
2066
2067        // Find the first shard that might contain blocks > keep_to.
2068        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
2069        let boundary_idx = shards.iter().position(|(key, _)| {
2070            key.sharded_key.highest_block_number == u64::MAX ||
2071                key.sharded_key.highest_block_number > keep_to
2072        });
2073
2074        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2075        let Some(boundary_idx) = boundary_idx else {
2076            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2077            if last_key.sharded_key.highest_block_number != u64::MAX {
2078                self.delete::<tables::StoragesHistory>(last_key.clone())?;
2079                self.put::<tables::StoragesHistory>(
2080                    StorageShardedKey::last(address, storage_key),
2081                    last_value,
2082                )?;
2083            }
2084            return Ok(());
2085        };
2086
2087        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2088        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2089            self.delete::<tables::StoragesHistory>(key.clone())?;
2090        }
2091
2092        // Process the boundary shard: filter out blocks > keep_to
2093        let (boundary_key, boundary_list) = &shards[boundary_idx];
2094
2095        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2096        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2097
2098        // Build truncated list once; check emptiness directly (avoids double iteration)
2099        let new_last =
2100            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2101
2102        if new_last.is_empty() {
2103            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2104            // u64::MAX.
2105            if boundary_idx == 0 {
2106                // Nothing left for this (address, storage_key) pair
2107                return Ok(());
2108            }
2109
2110            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2111            if prev_key.sharded_key.highest_block_number != u64::MAX {
2112                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2113                self.put::<tables::StoragesHistory>(
2114                    StorageShardedKey::last(address, storage_key),
2115                    prev_value,
2116                )?;
2117            }
2118            return Ok(());
2119        }
2120
2121        self.put::<tables::StoragesHistory>(
2122            StorageShardedKey::last(address, storage_key),
2123            &new_last,
2124        )?;
2125
2126        Ok(())
2127    }
2128
2129    /// Clears all account history shards for the given address.
2130    ///
2131    /// Used when unwinding from block 0 (i.e., removing all history).
2132    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2133        let shards = self.provider.account_history_shards(address)?;
2134        for (key, _) in shards {
2135            self.delete::<tables::AccountsHistory>(key)?;
2136        }
2137        Ok(())
2138    }
2139
2140    /// Clears all storage history shards for the given `(address, storage_key)` pair.
2141    ///
2142    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
2143    pub fn clear_storage_history(
2144        &mut self,
2145        address: Address,
2146        storage_key: B256,
2147    ) -> ProviderResult<()> {
2148        let shards = self.provider.storage_history_shards(address, storage_key)?;
2149        for (key, _) in shards {
2150            self.delete::<tables::StoragesHistory>(key)?;
2151        }
2152        Ok(())
2153    }
2154}
2155
2156/// `RocksDB` transaction wrapper providing MDBX-like semantics.
2157///
2158/// Supports:
2159/// - Read-your-writes: reads see uncommitted writes within the same transaction
2160/// - Atomic commit/rollback
2161/// - Iteration over uncommitted data
2162///
2163/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
2164/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
2165pub struct RocksTx<'db> {
2166    inner: Transaction<'db, OptimisticTransactionDB>,
2167    provider: &'db RocksDBProvider,
2168}
2169
2170impl fmt::Debug for RocksTx<'_> {
2171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2172        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2173    }
2174}
2175
2176impl<'db> RocksTx<'db> {
2177    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
2178    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2179        let encoded_key = key.encode();
2180        self.get_encoded::<T>(&encoded_key)
2181    }
2182
2183    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
2184    pub fn get_encoded<T: Table>(
2185        &self,
2186        key: &<T::Key as Encode>::Encoded,
2187    ) -> ProviderResult<Option<T::Value>> {
2188        let cf = self.provider.get_cf_handle::<T>()?;
2189        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2190            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2191                message: e.to_string().into(),
2192                code: -1,
2193            }))
2194        })?;
2195
2196        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2197    }
2198
2199    /// Puts a value into the specified table.
2200    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2201        let encoded_key = key.encode();
2202        self.put_encoded::<T>(&encoded_key, value)
2203    }
2204
2205    /// Puts a value using pre-encoded key.
2206    pub fn put_encoded<T: Table>(
2207        &self,
2208        key: &<T::Key as Encode>::Encoded,
2209        value: &T::Value,
2210    ) -> ProviderResult<()> {
2211        let cf = self.provider.get_cf_handle::<T>()?;
2212        let mut buf = Vec::new();
2213        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2214
2215        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2216            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2217                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2218                operation: DatabaseWriteOperation::PutUpsert,
2219                table_name: T::NAME,
2220                key: key.as_ref().to_vec(),
2221            })))
2222        })
2223    }
2224
2225    /// Deletes a value from the specified table.
2226    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2227        let cf = self.provider.get_cf_handle::<T>()?;
2228        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2229            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2230                message: e.to_string().into(),
2231                code: -1,
2232            }))
2233        })
2234    }
2235
2236    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
2237    ///
2238    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
2239    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2240        let cf = self.provider.get_cf_handle::<T>()?;
2241        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2242        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2243    }
2244
2245    /// Creates an iterator starting from the given key (inclusive).
2246    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2247        let cf = self.provider.get_cf_handle::<T>()?;
2248        let encoded_key = key.encode();
2249        let iter = self
2250            .inner
2251            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2252        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2253    }
2254
2255    /// Commits the transaction, persisting all changes.
2256    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2257    pub fn commit(self) -> ProviderResult<()> {
2258        self.inner.commit().map_err(|e| {
2259            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2260                message: e.to_string().into(),
2261                code: -1,
2262            }))
2263        })
2264    }
2265
2266    /// Rolls back the transaction, discarding all changes.
2267    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2268    pub fn rollback(self) -> ProviderResult<()> {
2269        self.inner.rollback().map_err(|e| {
2270            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2271        })
2272    }
2273
2274    /// Lookup account history and return [`HistoryInfo`] directly.
2275    ///
2276    /// This is a thin wrapper around `history_info` that:
2277    /// - Builds the `ShardedKey` for the address + target block.
2278    /// - Validates that the found shard belongs to the same address.
2279    pub fn account_history_info(
2280        &self,
2281        address: Address,
2282        block_number: BlockNumber,
2283        lowest_available_block_number: Option<BlockNumber>,
2284    ) -> ProviderResult<HistoryInfo> {
2285        let key = ShardedKey::new(address, block_number);
2286        self.history_info::<tables::AccountsHistory>(
2287            key.encode().as_ref(),
2288            block_number,
2289            lowest_available_block_number,
2290            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
2291            |prev_bytes| {
2292                <ShardedKey<Address> as Decode>::decode(prev_bytes)
2293                    .map(|k| k.key == address)
2294                    .unwrap_or(false)
2295            },
2296        )
2297    }
2298
2299    /// Lookup storage history and return [`HistoryInfo`] directly.
2300    ///
2301    /// This is a thin wrapper around `history_info` that:
2302    /// - Builds the `StorageShardedKey` for address + storage key + target block.
2303    /// - Validates that the found shard belongs to the same address and storage slot.
2304    pub fn storage_history_info(
2305        &self,
2306        address: Address,
2307        storage_key: B256,
2308        block_number: BlockNumber,
2309        lowest_available_block_number: Option<BlockNumber>,
2310    ) -> ProviderResult<HistoryInfo> {
2311        let key = StorageShardedKey::new(address, storage_key, block_number);
2312        self.history_info::<tables::StoragesHistory>(
2313            key.encode().as_ref(),
2314            block_number,
2315            lowest_available_block_number,
2316            |key_bytes| {
2317                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
2318                Ok(k.address == address && k.sharded_key.key == storage_key)
2319            },
2320            |prev_bytes| {
2321                <StorageShardedKey as Decode>::decode(prev_bytes)
2322                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
2323                    .unwrap_or(false)
2324            },
2325        )
2326    }
2327
2328    /// Generic history lookup for sharded history tables.
2329    ///
2330    /// Seeks to the shard containing `block_number`, checks if the key matches via `key_matches`,
2331    /// and uses `prev_key_matches` to detect if a previous shard exists for the same key.
2332    fn history_info<T>(
2333        &self,
2334        encoded_key: &[u8],
2335        block_number: BlockNumber,
2336        lowest_available_block_number: Option<BlockNumber>,
2337        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
2338        prev_key_matches: impl Fn(&[u8]) -> bool,
2339    ) -> ProviderResult<HistoryInfo>
2340    where
2341        T: Table<Value = BlockNumberList>,
2342    {
2343        // History may be pruned if a lowest available block is set.
2344        let is_maybe_pruned = lowest_available_block_number.is_some();
2345        let fallback = || {
2346            Ok(if is_maybe_pruned {
2347                HistoryInfo::MaybeInPlainState
2348            } else {
2349                HistoryInfo::NotYetWritten
2350            })
2351        };
2352
2353        let cf = self.provider.0.cf_handle_rw(T::NAME)?;
2354
2355        // Create a raw iterator to access key bytes directly.
2356        let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
2357            self.inner.raw_iterator_cf(&cf);
2358
2359        // Seek to the smallest key >= encoded_key.
2360        iter.seek(encoded_key);
2361        Self::raw_iter_status_ok(&iter)?;
2362
2363        if !iter.valid() {
2364            // No shard found at or after target block.
2365            //
2366            // (MaybeInPlainState) The key may have been written, but due to pruning we may not have
2367            // changesets and history, so we need to make a plain state lookup.
2368            // (HistoryInfo::NotYetWritten) The key has not been written to at all.
2369            return fallback();
2370        }
2371
2372        // Check if the found key matches our target entity.
2373        let Some(key_bytes) = iter.key() else {
2374            return fallback();
2375        };
2376        if !key_matches(key_bytes)? {
2377            // The found key is for a different entity.
2378            return fallback();
2379        }
2380
2381        // Decompress the block list for this shard.
2382        let Some(value_bytes) = iter.value() else {
2383            return fallback();
2384        };
2385        let chunk = BlockNumberList::decompress(value_bytes)?;
2386        let (rank, found_block) = compute_history_rank(&chunk, block_number);
2387
2388        // Lazy check for previous shard - only called when needed.
2389        // If we can step to a previous shard for this same key, history already exists,
2390        // so the target block is not before the first write.
2391        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
2392            iter.prev();
2393            Self::raw_iter_status_ok(&iter)?;
2394            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
2395            !has_prev
2396        } else {
2397            false
2398        };
2399
2400        Ok(HistoryInfo::from_lookup(
2401            found_block,
2402            is_before_first_write,
2403            lowest_available_block_number,
2404        ))
2405    }
2406
2407    /// Returns an error if the raw iterator is in an invalid state due to an I/O error.
2408    fn raw_iter_status_ok(
2409        iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
2410    ) -> ProviderResult<()> {
2411        iter.status().map_err(|e| {
2412            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2413                message: e.to_string().into(),
2414                code: -1,
2415            }))
2416        })
2417    }
2418}
2419
2420/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
2421enum RocksDBIterEnum<'db> {
2422    /// Iterator from read-write `OptimisticTransactionDB`.
2423    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2424    /// Iterator from read-only `DB`.
2425    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2426}
2427
2428impl Iterator for RocksDBIterEnum<'_> {
2429    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2430
2431    fn next(&mut self) -> Option<Self::Item> {
2432        match self {
2433            Self::ReadWrite(iter) => iter.next(),
2434            Self::ReadOnly(iter) => iter.next(),
2435        }
2436    }
2437}
2438
2439/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
2440///
2441/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
2442/// without reinitializing the iterator.
2443enum RocksDBRawIterEnum<'db> {
2444    /// Raw iterator from read-write `OptimisticTransactionDB`.
2445    ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2446    /// Raw iterator from read-only `DB`.
2447    ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2448}
2449
2450impl RocksDBRawIterEnum<'_> {
2451    /// Positions the iterator at the first key >= `key`.
2452    fn seek(&mut self, key: impl AsRef<[u8]>) {
2453        match self {
2454            Self::ReadWrite(iter) => iter.seek(key),
2455            Self::ReadOnly(iter) => iter.seek(key),
2456        }
2457    }
2458
2459    /// Returns true if the iterator is positioned at a valid key-value pair.
2460    fn valid(&self) -> bool {
2461        match self {
2462            Self::ReadWrite(iter) => iter.valid(),
2463            Self::ReadOnly(iter) => iter.valid(),
2464        }
2465    }
2466
2467    /// Returns the current key, if valid.
2468    fn key(&self) -> Option<&[u8]> {
2469        match self {
2470            Self::ReadWrite(iter) => iter.key(),
2471            Self::ReadOnly(iter) => iter.key(),
2472        }
2473    }
2474
2475    /// Returns the current value, if valid.
2476    fn value(&self) -> Option<&[u8]> {
2477        match self {
2478            Self::ReadWrite(iter) => iter.value(),
2479            Self::ReadOnly(iter) => iter.value(),
2480        }
2481    }
2482
2483    /// Advances the iterator to the next key.
2484    fn next(&mut self) {
2485        match self {
2486            Self::ReadWrite(iter) => iter.next(),
2487            Self::ReadOnly(iter) => iter.next(),
2488        }
2489    }
2490
2491    /// Returns the status of the iterator.
2492    fn status(&self) -> Result<(), rocksdb::Error> {
2493        match self {
2494            Self::ReadWrite(iter) => iter.status(),
2495            Self::ReadOnly(iter) => iter.status(),
2496        }
2497    }
2498}
2499
2500/// Iterator over a `RocksDB` table (non-transactional).
2501///
2502/// Yields decoded `(Key, Value)` pairs in key order.
2503pub struct RocksDBIter<'db, T: Table> {
2504    inner: RocksDBIterEnum<'db>,
2505    _marker: std::marker::PhantomData<T>,
2506}
2507
2508impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2510        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2511    }
2512}
2513
2514impl<T: Table> Iterator for RocksDBIter<'_, T> {
2515    type Item = ProviderResult<(T::Key, T::Value)>;
2516
2517    fn next(&mut self) -> Option<Self::Item> {
2518        Some(decode_iter_item::<T>(self.inner.next()?))
2519    }
2520}
2521
2522/// Raw iterator over a `RocksDB` table (non-transactional).
2523///
2524/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2525pub struct RocksDBRawIter<'db> {
2526    inner: RocksDBIterEnum<'db>,
2527}
2528
2529impl fmt::Debug for RocksDBRawIter<'_> {
2530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2531        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2532    }
2533}
2534
2535impl Iterator for RocksDBRawIter<'_> {
2536    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2537
2538    fn next(&mut self) -> Option<Self::Item> {
2539        match self.inner.next()? {
2540            Ok(kv) => Some(Ok(kv)),
2541            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2542                message: e.to_string().into(),
2543                code: -1,
2544            })))),
2545        }
2546    }
2547}
2548
2549/// Iterator over a `RocksDB` table within a transaction.
2550///
2551/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2552pub struct RocksTxIter<'tx, T: Table> {
2553    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2554    _marker: std::marker::PhantomData<T>,
2555}
2556
2557impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2558    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2559        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2560    }
2561}
2562
2563impl<T: Table> Iterator for RocksTxIter<'_, T> {
2564    type Item = ProviderResult<(T::Key, T::Value)>;
2565
2566    fn next(&mut self) -> Option<Self::Item> {
2567        Some(decode_iter_item::<T>(self.inner.next()?))
2568    }
2569}
2570
2571/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
2572///
2573/// Handles both error propagation from the underlying iterator and
2574/// decoding/decompression of the key and value bytes.
2575fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2576    let (key_bytes, value_bytes) = result.map_err(|e| {
2577        ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2578            message: e.to_string().into(),
2579            code: -1,
2580        }))
2581    })?;
2582
2583    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2584        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2585
2586    let value = T::Value::decompress(&value_bytes)
2587        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2588
2589    Ok((key, value))
2590}
2591
2592/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2593const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2594    match level {
2595        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2596        LogLevel::Error => rocksdb::LogLevel::Error,
2597        LogLevel::Warn => rocksdb::LogLevel::Warn,
2598        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2599        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2600    }
2601}
2602
2603#[cfg(test)]
2604mod tests {
2605    use super::*;
2606    use crate::providers::HistoryInfo;
2607    use alloy_primitives::{Address, TxHash, B256};
2608    use reth_db_api::{
2609        models::{
2610            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2611            storage_sharded_key::StorageShardedKey,
2612            IntegerList,
2613        },
2614        table::Table,
2615        tables,
2616    };
2617    use tempfile::TempDir;
2618
2619    #[test]
2620    fn test_with_default_tables_registers_required_column_families() {
2621        let temp_dir = TempDir::new().unwrap();
2622
2623        // Build with default tables
2624        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2625
2626        // Should be able to write/read TransactionHashNumbers
2627        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2628        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2629        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2630
2631        // Should be able to write/read AccountsHistory
2632        let key = ShardedKey::new(Address::ZERO, 100);
2633        let value = IntegerList::default();
2634        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2635        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2636
2637        // Should be able to write/read StoragesHistory
2638        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2639        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2640        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2641    }
2642
2643    #[derive(Debug)]
2644    struct TestTable;
2645
2646    impl Table for TestTable {
2647        const NAME: &'static str = "TestTable";
2648        const DUPSORT: bool = false;
2649        type Key = u64;
2650        type Value = Vec<u8>;
2651    }
2652
2653    #[test]
2654    fn test_basic_operations() {
2655        let temp_dir = TempDir::new().unwrap();
2656
2657        let provider = RocksDBBuilder::new(temp_dir.path())
2658            .with_table::<TestTable>() // Type-safe!
2659            .build()
2660            .unwrap();
2661
2662        let key = 42u64;
2663        let value = b"test_value".to_vec();
2664
2665        // Test write
2666        provider.put::<TestTable>(key, &value).unwrap();
2667
2668        // Test read
2669        let result = provider.get::<TestTable>(key).unwrap();
2670        assert_eq!(result, Some(value));
2671
2672        // Test delete
2673        provider.delete::<TestTable>(key).unwrap();
2674
2675        // Verify deletion
2676        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2677    }
2678
2679    #[test]
2680    fn test_batch_operations() {
2681        let temp_dir = TempDir::new().unwrap();
2682        let provider =
2683            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2684
2685        // Write multiple entries in a batch
2686        provider
2687            .write_batch(|batch| {
2688                for i in 0..10u64 {
2689                    let value = format!("value_{i}").into_bytes();
2690                    batch.put::<TestTable>(i, &value)?;
2691                }
2692                Ok(())
2693            })
2694            .unwrap();
2695
2696        // Read all entries
2697        for i in 0..10u64 {
2698            let value = format!("value_{i}").into_bytes();
2699            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2700        }
2701
2702        // Delete all entries in a batch
2703        provider
2704            .write_batch(|batch| {
2705                for i in 0..10u64 {
2706                    batch.delete::<TestTable>(i)?;
2707                }
2708                Ok(())
2709            })
2710            .unwrap();
2711
2712        // Verify all deleted
2713        for i in 0..10u64 {
2714            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2715        }
2716    }
2717
2718    #[test]
2719    fn test_with_real_table() {
2720        let temp_dir = TempDir::new().unwrap();
2721        let provider = RocksDBBuilder::new(temp_dir.path())
2722            .with_table::<tables::TransactionHashNumbers>()
2723            .with_metrics()
2724            .build()
2725            .unwrap();
2726
2727        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2728
2729        // Insert and retrieve
2730        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2731        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2732
2733        // Batch insert multiple transactions
2734        provider
2735            .write_batch(|batch| {
2736                for i in 0..10u64 {
2737                    let hash = TxHash::from(B256::from([i as u8; 32]));
2738                    let value = i * 100;
2739                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2740                }
2741                Ok(())
2742            })
2743            .unwrap();
2744
2745        // Verify batch insertions
2746        for i in 0..10u64 {
2747            let hash = TxHash::from(B256::from([i as u8; 32]));
2748            assert_eq!(
2749                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2750                Some(i * 100)
2751            );
2752        }
2753    }
2754    #[test]
2755    fn test_statistics_enabled() {
2756        let temp_dir = TempDir::new().unwrap();
2757        // Just verify that building with statistics doesn't panic
2758        let provider = RocksDBBuilder::new(temp_dir.path())
2759            .with_table::<TestTable>()
2760            .with_statistics()
2761            .build()
2762            .unwrap();
2763
2764        // Do operations - data should be immediately readable with OptimisticTransactionDB
2765        for i in 0..10 {
2766            let value = vec![i as u8];
2767            provider.put::<TestTable>(i, &value).unwrap();
2768            // Verify write is visible
2769            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2770        }
2771    }
2772
2773    #[test]
2774    fn test_data_persistence() {
2775        let temp_dir = TempDir::new().unwrap();
2776        let provider =
2777            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2778
2779        // Insert data - OptimisticTransactionDB writes are immediately visible
2780        let value = vec![42u8; 1000];
2781        for i in 0..100 {
2782            provider.put::<TestTable>(i, &value).unwrap();
2783        }
2784
2785        // Verify data is readable
2786        for i in 0..100 {
2787            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2788        }
2789    }
2790
2791    #[test]
2792    fn test_transaction_read_your_writes() {
2793        let temp_dir = TempDir::new().unwrap();
2794        let provider =
2795            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2796
2797        // Create a transaction
2798        let tx = provider.tx();
2799
2800        // Write data within the transaction
2801        let key = 42u64;
2802        let value = b"test_value".to_vec();
2803        tx.put::<TestTable>(key, &value).unwrap();
2804
2805        // Read-your-writes: should see uncommitted data in same transaction
2806        let result = tx.get::<TestTable>(key).unwrap();
2807        assert_eq!(
2808            result,
2809            Some(value.clone()),
2810            "Transaction should see its own uncommitted writes"
2811        );
2812
2813        // Data should NOT be visible via provider (outside transaction)
2814        let provider_result = provider.get::<TestTable>(key).unwrap();
2815        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2816
2817        // Commit the transaction
2818        tx.commit().unwrap();
2819
2820        // Now data should be visible via provider
2821        let committed_result = provider.get::<TestTable>(key).unwrap();
2822        assert_eq!(committed_result, Some(value), "Committed data should be visible");
2823    }
2824
2825    #[test]
2826    fn test_transaction_rollback() {
2827        let temp_dir = TempDir::new().unwrap();
2828        let provider =
2829            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2830
2831        // First, put some initial data
2832        let key = 100u64;
2833        let initial_value = b"initial".to_vec();
2834        provider.put::<TestTable>(key, &initial_value).unwrap();
2835
2836        // Create a transaction and modify data
2837        let tx = provider.tx();
2838        let new_value = b"modified".to_vec();
2839        tx.put::<TestTable>(key, &new_value).unwrap();
2840
2841        // Verify modification is visible within transaction
2842        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2843
2844        // Rollback instead of commit
2845        tx.rollback().unwrap();
2846
2847        // Data should be unchanged (initial value)
2848        let result = provider.get::<TestTable>(key).unwrap();
2849        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2850    }
2851
2852    #[test]
2853    fn test_transaction_iterator() {
2854        let temp_dir = TempDir::new().unwrap();
2855        let provider =
2856            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2857
2858        // Create a transaction
2859        let tx = provider.tx();
2860
2861        // Write multiple entries
2862        for i in 0..5u64 {
2863            let value = format!("value_{i}").into_bytes();
2864            tx.put::<TestTable>(i, &value).unwrap();
2865        }
2866
2867        // Iterate - should see uncommitted writes
2868        let mut count = 0;
2869        for result in tx.iter::<TestTable>().unwrap() {
2870            let (key, value) = result.unwrap();
2871            assert_eq!(value, format!("value_{key}").into_bytes());
2872            count += 1;
2873        }
2874        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2875
2876        // Commit
2877        tx.commit().unwrap();
2878    }
2879
2880    #[test]
2881    fn test_batch_manual_commit() {
2882        let temp_dir = TempDir::new().unwrap();
2883        let provider =
2884            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2885
2886        // Create a batch via provider.batch()
2887        let mut batch = provider.batch();
2888
2889        // Add entries
2890        for i in 0..10u64 {
2891            let value = format!("batch_value_{i}").into_bytes();
2892            batch.put::<TestTable>(i, &value).unwrap();
2893        }
2894
2895        // Verify len/is_empty
2896        assert_eq!(batch.len(), 10);
2897        assert!(!batch.is_empty());
2898
2899        // Data should NOT be visible before commit
2900        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2901
2902        // Commit the batch
2903        batch.commit().unwrap();
2904
2905        // Now data should be visible
2906        for i in 0..10u64 {
2907            let value = format!("batch_value_{i}").into_bytes();
2908            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2909        }
2910    }
2911
2912    #[test]
2913    fn test_first_and_last_entry() {
2914        let temp_dir = TempDir::new().unwrap();
2915        let provider =
2916            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2917
2918        // Empty table should return None for both
2919        assert_eq!(provider.first::<TestTable>().unwrap(), None);
2920        assert_eq!(provider.last::<TestTable>().unwrap(), None);
2921
2922        // Insert some entries
2923        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2924        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2925        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2926
2927        // First should return the smallest key
2928        let first = provider.first::<TestTable>().unwrap();
2929        assert_eq!(first, Some((5, b"value_5".to_vec())));
2930
2931        // Last should return the largest key
2932        let last = provider.last::<TestTable>().unwrap();
2933        assert_eq!(last, Some((20, b"value_20".to_vec())));
2934    }
2935
2936    /// Tests the edge case where block < `lowest_available_block_number`.
2937    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
2938    /// so we keep this RocksDB-specific test to verify the low-level behavior.
2939    #[test]
2940    fn test_account_history_info_pruned_before_first_entry() {
2941        let temp_dir = TempDir::new().unwrap();
2942        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2943
2944        let address = Address::from([0x42; 20]);
2945
2946        // Create a single shard starting at block 100
2947        let chunk = IntegerList::new([100, 200, 300]).unwrap();
2948        let shard_key = ShardedKey::new(address, u64::MAX);
2949        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2950
2951        let tx = provider.tx();
2952
2953        // Query for block 50 with lowest_available_block_number = 100
2954        // This simulates a pruned state where data before block 100 is not available.
2955        // Since we're before the first write AND pruning boundary is set, we need to
2956        // check the changeset at the first write block.
2957        let result = tx.account_history_info(address, 50, Some(100)).unwrap();
2958        assert_eq!(result, HistoryInfo::InChangeset(100));
2959
2960        tx.rollback().unwrap();
2961    }
2962
2963    #[test]
2964    fn test_account_history_shard_split_at_boundary() {
2965        let temp_dir = TempDir::new().unwrap();
2966        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2967
2968        let address = Address::from([0x42; 20]);
2969        let limit = NUM_OF_INDICES_IN_SHARD;
2970
2971        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
2972        let indices: Vec<u64> = (0..=(limit as u64)).collect();
2973        let mut batch = provider.batch();
2974        batch.append_account_history_shard(address, indices).unwrap();
2975        batch.commit().unwrap();
2976
2977        // Should have 2 shards: one completed shard and one sentinel shard
2978        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
2979        let sentinel_key = ShardedKey::new(address, u64::MAX);
2980
2981        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
2982        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
2983
2984        assert!(completed_shard.is_some(), "completed shard should exist");
2985        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2986
2987        let completed_shard = completed_shard.unwrap();
2988        let sentinel_shard = sentinel_shard.unwrap();
2989
2990        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2991        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2992    }
2993
2994    #[test]
2995    fn test_account_history_multiple_shard_splits() {
2996        let temp_dir = TempDir::new().unwrap();
2997        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2998
2999        let address = Address::from([0x43; 20]);
3000        let limit = NUM_OF_INDICES_IN_SHARD;
3001
3002        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3003        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3004        let mut batch = provider.batch();
3005        batch.append_account_history_shard(address, first_batch_indices).unwrap();
3006        batch.commit().unwrap();
3007
3008        // Should have just a sentinel shard (exactly at limit, not over)
3009        let sentinel_key = ShardedKey::new(address, u64::MAX);
3010        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3011        assert!(shard.is_some());
3012        assert_eq!(shard.unwrap().len(), limit as u64);
3013
3014        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3015        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3016        let mut batch = provider.batch();
3017        batch.append_account_history_shard(address, second_batch_indices).unwrap();
3018        batch.commit().unwrap();
3019
3020        // Now we should have: 2 completed shards + 1 sentinel shard
3021        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3022        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3023
3024        assert!(
3025            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3026            "first completed shard should exist"
3027        );
3028        assert!(
3029            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3030            "second completed shard should exist"
3031        );
3032        assert!(
3033            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3034            "sentinel shard should exist"
3035        );
3036    }
3037
3038    #[test]
3039    fn test_storage_history_shard_split_at_boundary() {
3040        let temp_dir = TempDir::new().unwrap();
3041        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3042
3043        let address = Address::from([0x44; 20]);
3044        let slot = B256::from([0x55; 32]);
3045        let limit = NUM_OF_INDICES_IN_SHARD;
3046
3047        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3048        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3049        let mut batch = provider.batch();
3050        batch.append_storage_history_shard(address, slot, indices).unwrap();
3051        batch.commit().unwrap();
3052
3053        // Should have 2 shards: one completed shard and one sentinel shard
3054        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3055        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3056
3057        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3058        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3059
3060        assert!(completed_shard.is_some(), "completed shard should exist");
3061        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3062
3063        let completed_shard = completed_shard.unwrap();
3064        let sentinel_shard = sentinel_shard.unwrap();
3065
3066        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3067        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3068    }
3069
3070    #[test]
3071    fn test_storage_history_multiple_shard_splits() {
3072        let temp_dir = TempDir::new().unwrap();
3073        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3074
3075        let address = Address::from([0x46; 20]);
3076        let slot = B256::from([0x57; 32]);
3077        let limit = NUM_OF_INDICES_IN_SHARD;
3078
3079        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3080        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3081        let mut batch = provider.batch();
3082        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3083        batch.commit().unwrap();
3084
3085        // Should have just a sentinel shard (exactly at limit, not over)
3086        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3087        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3088        assert!(shard.is_some());
3089        assert_eq!(shard.unwrap().len(), limit as u64);
3090
3091        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3092        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3093        let mut batch = provider.batch();
3094        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3095        batch.commit().unwrap();
3096
3097        // Now we should have: 2 completed shards + 1 sentinel shard
3098        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3099        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3100
3101        assert!(
3102            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3103            "first completed shard should exist"
3104        );
3105        assert!(
3106            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3107            "second completed shard should exist"
3108        );
3109        assert!(
3110            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3111            "sentinel shard should exist"
3112        );
3113    }
3114
3115    #[test]
3116    fn test_clear_table() {
3117        let temp_dir = TempDir::new().unwrap();
3118        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3119
3120        let address = Address::from([0x42; 20]);
3121        let key = ShardedKey::new(address, u64::MAX);
3122        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3123
3124        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3125        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3126
3127        provider.clear::<tables::AccountsHistory>().unwrap();
3128
3129        assert!(
3130            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3131            "table should be empty after clear"
3132        );
3133        assert!(
3134            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3135            "first() should return None after clear"
3136        );
3137    }
3138
3139    #[test]
3140    fn test_clear_empty_table() {
3141        let temp_dir = TempDir::new().unwrap();
3142        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3143
3144        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3145
3146        provider.clear::<tables::AccountsHistory>().unwrap();
3147
3148        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3149    }
3150
3151    #[test]
3152    fn test_unwind_account_history_to_basic() {
3153        let temp_dir = TempDir::new().unwrap();
3154        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3155
3156        let address = Address::from([0x42; 20]);
3157
3158        // Add blocks 0-10
3159        let mut batch = provider.batch();
3160        batch.append_account_history_shard(address, 0..=10).unwrap();
3161        batch.commit().unwrap();
3162
3163        // Verify we have blocks 0-10
3164        let key = ShardedKey::new(address, u64::MAX);
3165        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3166        assert!(result.is_some());
3167        let blocks: Vec<u64> = result.unwrap().iter().collect();
3168        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3169
3170        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
3171        let mut batch = provider.batch();
3172        batch.unwind_account_history_to(address, 5).unwrap();
3173        batch.commit().unwrap();
3174
3175        // Verify only blocks 0-5 remain
3176        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3177        assert!(result.is_some());
3178        let blocks: Vec<u64> = result.unwrap().iter().collect();
3179        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3180    }
3181
3182    #[test]
3183    fn test_unwind_account_history_to_removes_all() {
3184        let temp_dir = TempDir::new().unwrap();
3185        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3186
3187        let address = Address::from([0x42; 20]);
3188
3189        // Add blocks 5-10
3190        let mut batch = provider.batch();
3191        batch.append_account_history_shard(address, 5..=10).unwrap();
3192        batch.commit().unwrap();
3193
3194        // Unwind to block 4 (removes all blocks since they're all > 4)
3195        let mut batch = provider.batch();
3196        batch.unwind_account_history_to(address, 4).unwrap();
3197        batch.commit().unwrap();
3198
3199        // Verify no data remains for this address
3200        let key = ShardedKey::new(address, u64::MAX);
3201        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3202        assert!(result.is_none(), "Should have no data after full unwind");
3203    }
3204
3205    #[test]
3206    fn test_unwind_account_history_to_no_op() {
3207        let temp_dir = TempDir::new().unwrap();
3208        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3209
3210        let address = Address::from([0x42; 20]);
3211
3212        // Add blocks 0-5
3213        let mut batch = provider.batch();
3214        batch.append_account_history_shard(address, 0..=5).unwrap();
3215        batch.commit().unwrap();
3216
3217        // Unwind to block 10 (no-op since all blocks are <= 10)
3218        let mut batch = provider.batch();
3219        batch.unwind_account_history_to(address, 10).unwrap();
3220        batch.commit().unwrap();
3221
3222        // Verify blocks 0-5 still remain
3223        let key = ShardedKey::new(address, u64::MAX);
3224        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3225        assert!(result.is_some());
3226        let blocks: Vec<u64> = result.unwrap().iter().collect();
3227        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3228    }
3229
3230    #[test]
3231    fn test_unwind_account_history_to_block_zero() {
3232        let temp_dir = TempDir::new().unwrap();
3233        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3234
3235        let address = Address::from([0x42; 20]);
3236
3237        // Add blocks 0-5 (including block 0)
3238        let mut batch = provider.batch();
3239        batch.append_account_history_shard(address, 0..=5).unwrap();
3240        batch.commit().unwrap();
3241
3242        // Unwind to block 0 (keep only block 0, remove 1-5)
3243        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
3244        let mut batch = provider.batch();
3245        batch.unwind_account_history_to(address, 0).unwrap();
3246        batch.commit().unwrap();
3247
3248        // Verify only block 0 remains
3249        let key = ShardedKey::new(address, u64::MAX);
3250        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3251        assert!(result.is_some());
3252        let blocks: Vec<u64> = result.unwrap().iter().collect();
3253        assert_eq!(blocks, vec![0]);
3254    }
3255
3256    #[test]
3257    fn test_unwind_account_history_to_multi_shard() {
3258        let temp_dir = TempDir::new().unwrap();
3259        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3260
3261        let address = Address::from([0x42; 20]);
3262
3263        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
3264        // For testing, we'll manually create shards with specific keys
3265        let mut batch = provider.batch();
3266
3267        // First shard: blocks 1-50, keyed by 50
3268        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3269        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3270
3271        // Second shard: blocks 51-100, keyed by MAX (sentinel)
3272        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3273        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3274
3275        batch.commit().unwrap();
3276
3277        // Verify we have 2 shards
3278        let shards = provider.account_history_shards(address).unwrap();
3279        assert_eq!(shards.len(), 2);
3280
3281        // Unwind to block 75 (keep 1-75, remove 76-100)
3282        let mut batch = provider.batch();
3283        batch.unwind_account_history_to(address, 75).unwrap();
3284        batch.commit().unwrap();
3285
3286        // Verify: shard1 should be untouched, shard2 should be truncated
3287        let shards = provider.account_history_shards(address).unwrap();
3288        assert_eq!(shards.len(), 2);
3289
3290        // First shard unchanged
3291        assert_eq!(shards[0].0.highest_block_number, 50);
3292        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3293
3294        // Second shard truncated and re-keyed to MAX
3295        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3296        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3297    }
3298
3299    #[test]
3300    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3301        let temp_dir = TempDir::new().unwrap();
3302        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3303
3304        let address = Address::from([0x42; 20]);
3305
3306        // Create two shards
3307        let mut batch = provider.batch();
3308
3309        // First shard: blocks 1-50, keyed by 50
3310        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3311        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3312
3313        // Second shard: blocks 75-100, keyed by MAX
3314        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3315        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3316
3317        batch.commit().unwrap();
3318
3319        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
3320        let mut batch = provider.batch();
3321        batch.unwind_account_history_to(address, 60).unwrap();
3322        batch.commit().unwrap();
3323
3324        // Verify: only shard1 remains, now keyed as MAX
3325        let shards = provider.account_history_shards(address).unwrap();
3326        assert_eq!(shards.len(), 1);
3327        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3328        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3329    }
3330
3331    #[test]
3332    fn test_account_history_shards_iterator() {
3333        let temp_dir = TempDir::new().unwrap();
3334        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3335
3336        let address = Address::from([0x42; 20]);
3337        let other_address = Address::from([0x43; 20]);
3338
3339        // Add data for two addresses
3340        let mut batch = provider.batch();
3341        batch.append_account_history_shard(address, 0..=5).unwrap();
3342        batch.append_account_history_shard(other_address, 10..=15).unwrap();
3343        batch.commit().unwrap();
3344
3345        // Query shards for first address only
3346        let shards = provider.account_history_shards(address).unwrap();
3347        assert_eq!(shards.len(), 1);
3348        assert_eq!(shards[0].0.key, address);
3349
3350        // Query shards for second address only
3351        let shards = provider.account_history_shards(other_address).unwrap();
3352        assert_eq!(shards.len(), 1);
3353        assert_eq!(shards[0].0.key, other_address);
3354
3355        // Query shards for non-existent address
3356        let non_existent = Address::from([0x99; 20]);
3357        let shards = provider.account_history_shards(non_existent).unwrap();
3358        assert!(shards.is_empty());
3359    }
3360
3361    #[test]
3362    fn test_clear_account_history() {
3363        let temp_dir = TempDir::new().unwrap();
3364        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3365
3366        let address = Address::from([0x42; 20]);
3367
3368        // Add blocks 0-10
3369        let mut batch = provider.batch();
3370        batch.append_account_history_shard(address, 0..=10).unwrap();
3371        batch.commit().unwrap();
3372
3373        // Clear all history (simulates unwind from block 0)
3374        let mut batch = provider.batch();
3375        batch.clear_account_history(address).unwrap();
3376        batch.commit().unwrap();
3377
3378        // Verify no data remains
3379        let shards = provider.account_history_shards(address).unwrap();
3380        assert!(shards.is_empty(), "All shards should be deleted");
3381    }
3382
3383    #[test]
3384    fn test_unwind_non_sentinel_boundary() {
3385        let temp_dir = TempDir::new().unwrap();
3386        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3387
3388        let address = Address::from([0x42; 20]);
3389
3390        // Create three shards with non-sentinel boundary
3391        let mut batch = provider.batch();
3392
3393        // Shard 1: blocks 1-50, keyed by 50
3394        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3395        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3396
3397        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
3398        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3399        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3400
3401        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
3402        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3403        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3404
3405        batch.commit().unwrap();
3406
3407        // Verify 3 shards
3408        let shards = provider.account_history_shards(address).unwrap();
3409        assert_eq!(shards.len(), 3);
3410
3411        // Unwind to block 75 (truncates shard2, deletes shard3)
3412        let mut batch = provider.batch();
3413        batch.unwind_account_history_to(address, 75).unwrap();
3414        batch.commit().unwrap();
3415
3416        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
3417        let shards = provider.account_history_shards(address).unwrap();
3418        assert_eq!(shards.len(), 2);
3419
3420        // First shard unchanged
3421        assert_eq!(shards[0].0.highest_block_number, 50);
3422        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3423
3424        // Second shard truncated and re-keyed to MAX
3425        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3426        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3427    }
3428
3429    #[test]
3430    fn test_batch_auto_commit_on_threshold() {
3431        let temp_dir = TempDir::new().unwrap();
3432        let provider =
3433            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3434
3435        // Create batch with tiny threshold (1KB) to force auto-commits
3436        let mut batch = RocksDBBatch {
3437            provider: &provider,
3438            inner: WriteBatchWithTransaction::<true>::default(),
3439            buf: Vec::new(),
3440            auto_commit_threshold: Some(1024), // 1KB
3441        };
3442
3443        // Write entries until we exceed threshold multiple times
3444        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
3445        for i in 0..100u64 {
3446            let value = format!("value_{i:04}").into_bytes();
3447            batch.put::<TestTable>(i, &value).unwrap();
3448        }
3449
3450        // Data should already be visible (auto-committed) even before final commit
3451        // At least some entries should be readable
3452        let first_visible = provider.get::<TestTable>(0).unwrap();
3453        assert!(first_visible.is_some(), "Auto-committed data should be visible");
3454
3455        // Final commit for remaining batch
3456        batch.commit().unwrap();
3457
3458        // All entries should now be visible
3459        for i in 0..100u64 {
3460            let value = format!("value_{i:04}").into_bytes();
3461            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3462        }
3463    }
3464
3465    // ==================== PARAMETERIZED PRUNE TESTS ====================
3466
3467    /// Test case for account history pruning
3468    struct AccountPruneCase {
3469        name: &'static str,
3470        initial_shards: &'static [(u64, &'static [u64])],
3471        prune_to: u64,
3472        expected_outcome: PruneShardOutcome,
3473        expected_shards: &'static [(u64, &'static [u64])],
3474    }
3475
3476    /// Test case for storage history pruning
3477    struct StoragePruneCase {
3478        name: &'static str,
3479        initial_shards: &'static [(u64, &'static [u64])],
3480        prune_to: u64,
3481        expected_outcome: PruneShardOutcome,
3482        expected_shards: &'static [(u64, &'static [u64])],
3483    }
3484
3485    #[test]
3486    fn test_prune_account_history_cases() {
3487        const MAX: u64 = u64::MAX;
3488        const CASES: &[AccountPruneCase] = &[
3489            AccountPruneCase {
3490                name: "single_shard_truncate",
3491                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3492                prune_to: 25,
3493                expected_outcome: PruneShardOutcome::Updated,
3494                expected_shards: &[(MAX, &[30, 40])],
3495            },
3496            AccountPruneCase {
3497                name: "single_shard_delete_all",
3498                initial_shards: &[(MAX, &[10, 20])],
3499                prune_to: 20,
3500                expected_outcome: PruneShardOutcome::Deleted,
3501                expected_shards: &[],
3502            },
3503            AccountPruneCase {
3504                name: "single_shard_noop",
3505                initial_shards: &[(MAX, &[10, 20])],
3506                prune_to: 5,
3507                expected_outcome: PruneShardOutcome::Unchanged,
3508                expected_shards: &[(MAX, &[10, 20])],
3509            },
3510            AccountPruneCase {
3511                name: "no_shards",
3512                initial_shards: &[],
3513                prune_to: 100,
3514                expected_outcome: PruneShardOutcome::Unchanged,
3515                expected_shards: &[],
3516            },
3517            AccountPruneCase {
3518                name: "multi_shard_truncate_first",
3519                initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3520                prune_to: 25,
3521                expected_outcome: PruneShardOutcome::Updated,
3522                expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3523            },
3524            AccountPruneCase {
3525                name: "delete_first_shard_sentinel_unchanged",
3526                initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3527                prune_to: 20,
3528                expected_outcome: PruneShardOutcome::Deleted,
3529                expected_shards: &[(MAX, &[30, 40])],
3530            },
3531            AccountPruneCase {
3532                name: "multi_shard_delete_all_but_last",
3533                initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3534                prune_to: 22,
3535                expected_outcome: PruneShardOutcome::Deleted,
3536                expected_shards: &[(MAX, &[25, 30])],
3537            },
3538            AccountPruneCase {
3539                name: "mid_shard_preserves_key",
3540                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3541                prune_to: 25,
3542                expected_outcome: PruneShardOutcome::Updated,
3543                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3544            },
3545            // Equivalence tests
3546            AccountPruneCase {
3547                name: "equiv_delete_early_shards_keep_sentinel",
3548                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3549                prune_to: 55,
3550                expected_outcome: PruneShardOutcome::Deleted,
3551                expected_shards: &[(MAX, &[60, 70])],
3552            },
3553            AccountPruneCase {
3554                name: "equiv_sentinel_becomes_empty_with_prev",
3555                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3556                prune_to: 40,
3557                expected_outcome: PruneShardOutcome::Deleted,
3558                expected_shards: &[(MAX, &[50])],
3559            },
3560            AccountPruneCase {
3561                name: "equiv_all_shards_become_empty",
3562                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3563                prune_to: 51,
3564                expected_outcome: PruneShardOutcome::Deleted,
3565                expected_shards: &[],
3566            },
3567            AccountPruneCase {
3568                name: "equiv_non_sentinel_last_shard_promoted",
3569                initial_shards: &[(100, &[50, 75, 100])],
3570                prune_to: 60,
3571                expected_outcome: PruneShardOutcome::Updated,
3572                expected_shards: &[(MAX, &[75, 100])],
3573            },
3574            AccountPruneCase {
3575                name: "equiv_filter_within_shard",
3576                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3577                prune_to: 25,
3578                expected_outcome: PruneShardOutcome::Updated,
3579                expected_shards: &[(MAX, &[30, 40])],
3580            },
3581            AccountPruneCase {
3582                name: "equiv_multi_shard_partial_delete",
3583                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3584                prune_to: 35,
3585                expected_outcome: PruneShardOutcome::Deleted,
3586                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3587            },
3588        ];
3589
3590        let address = Address::from([0x42; 20]);
3591
3592        for case in CASES {
3593            let temp_dir = TempDir::new().unwrap();
3594            let provider =
3595                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3596
3597            // Setup initial shards
3598            let mut batch = provider.batch();
3599            for (highest, blocks) in case.initial_shards {
3600                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3601                batch
3602                    .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3603                    .unwrap();
3604            }
3605            batch.commit().unwrap();
3606
3607            // Prune
3608            let mut batch = provider.batch();
3609            let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3610            batch.commit().unwrap();
3611
3612            // Assert outcome
3613            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3614
3615            // Assert final shards
3616            let shards = provider.account_history_shards(address).unwrap();
3617            assert_eq!(
3618                shards.len(),
3619                case.expected_shards.len(),
3620                "case '{}': wrong shard count",
3621                case.name
3622            );
3623            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3624                shards.iter().zip(case.expected_shards.iter()).enumerate()
3625            {
3626                assert_eq!(
3627                    key.highest_block_number, *exp_key,
3628                    "case '{}': shard {} wrong key",
3629                    case.name, i
3630                );
3631                assert_eq!(
3632                    blocks.iter().collect::<Vec<_>>(),
3633                    *exp_blocks,
3634                    "case '{}': shard {} wrong blocks",
3635                    case.name,
3636                    i
3637                );
3638            }
3639        }
3640    }
3641
3642    #[test]
3643    fn test_prune_storage_history_cases() {
3644        const MAX: u64 = u64::MAX;
3645        const CASES: &[StoragePruneCase] = &[
3646            StoragePruneCase {
3647                name: "single_shard_truncate",
3648                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3649                prune_to: 25,
3650                expected_outcome: PruneShardOutcome::Updated,
3651                expected_shards: &[(MAX, &[30, 40])],
3652            },
3653            StoragePruneCase {
3654                name: "single_shard_delete_all",
3655                initial_shards: &[(MAX, &[10, 20])],
3656                prune_to: 20,
3657                expected_outcome: PruneShardOutcome::Deleted,
3658                expected_shards: &[],
3659            },
3660            StoragePruneCase {
3661                name: "noop",
3662                initial_shards: &[(MAX, &[10, 20])],
3663                prune_to: 5,
3664                expected_outcome: PruneShardOutcome::Unchanged,
3665                expected_shards: &[(MAX, &[10, 20])],
3666            },
3667            StoragePruneCase {
3668                name: "no_shards",
3669                initial_shards: &[],
3670                prune_to: 100,
3671                expected_outcome: PruneShardOutcome::Unchanged,
3672                expected_shards: &[],
3673            },
3674            StoragePruneCase {
3675                name: "mid_shard_preserves_key",
3676                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3677                prune_to: 25,
3678                expected_outcome: PruneShardOutcome::Updated,
3679                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3680            },
3681            // Equivalence tests
3682            StoragePruneCase {
3683                name: "equiv_sentinel_promotion",
3684                initial_shards: &[(100, &[50, 75, 100])],
3685                prune_to: 60,
3686                expected_outcome: PruneShardOutcome::Updated,
3687                expected_shards: &[(MAX, &[75, 100])],
3688            },
3689            StoragePruneCase {
3690                name: "equiv_delete_early_shards_keep_sentinel",
3691                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3692                prune_to: 55,
3693                expected_outcome: PruneShardOutcome::Deleted,
3694                expected_shards: &[(MAX, &[60, 70])],
3695            },
3696            StoragePruneCase {
3697                name: "equiv_sentinel_becomes_empty_with_prev",
3698                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3699                prune_to: 40,
3700                expected_outcome: PruneShardOutcome::Deleted,
3701                expected_shards: &[(MAX, &[50])],
3702            },
3703            StoragePruneCase {
3704                name: "equiv_all_shards_become_empty",
3705                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3706                prune_to: 51,
3707                expected_outcome: PruneShardOutcome::Deleted,
3708                expected_shards: &[],
3709            },
3710            StoragePruneCase {
3711                name: "equiv_filter_within_shard",
3712                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3713                prune_to: 25,
3714                expected_outcome: PruneShardOutcome::Updated,
3715                expected_shards: &[(MAX, &[30, 40])],
3716            },
3717            StoragePruneCase {
3718                name: "equiv_multi_shard_partial_delete",
3719                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3720                prune_to: 35,
3721                expected_outcome: PruneShardOutcome::Deleted,
3722                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3723            },
3724        ];
3725
3726        let address = Address::from([0x42; 20]);
3727        let storage_key = B256::from([0x01; 32]);
3728
3729        for case in CASES {
3730            let temp_dir = TempDir::new().unwrap();
3731            let provider =
3732                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3733
3734            // Setup initial shards
3735            let mut batch = provider.batch();
3736            for (highest, blocks) in case.initial_shards {
3737                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3738                let key = if *highest == MAX {
3739                    StorageShardedKey::last(address, storage_key)
3740                } else {
3741                    StorageShardedKey::new(address, storage_key, *highest)
3742                };
3743                batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3744            }
3745            batch.commit().unwrap();
3746
3747            // Prune
3748            let mut batch = provider.batch();
3749            let outcome =
3750                batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3751            batch.commit().unwrap();
3752
3753            // Assert outcome
3754            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3755
3756            // Assert final shards
3757            let shards = provider.storage_history_shards(address, storage_key).unwrap();
3758            assert_eq!(
3759                shards.len(),
3760                case.expected_shards.len(),
3761                "case '{}': wrong shard count",
3762                case.name
3763            );
3764            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3765                shards.iter().zip(case.expected_shards.iter()).enumerate()
3766            {
3767                assert_eq!(
3768                    key.sharded_key.highest_block_number, *exp_key,
3769                    "case '{}': shard {} wrong key",
3770                    case.name, i
3771                );
3772                assert_eq!(
3773                    blocks.iter().collect::<Vec<_>>(),
3774                    *exp_blocks,
3775                    "case '{}': shard {} wrong blocks",
3776                    case.name,
3777                    i
3778                );
3779            }
3780        }
3781    }
3782
3783    #[test]
3784    fn test_prune_storage_history_does_not_affect_other_slots() {
3785        let temp_dir = TempDir::new().unwrap();
3786        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3787
3788        let address = Address::from([0x42; 20]);
3789        let slot1 = B256::from([0x01; 32]);
3790        let slot2 = B256::from([0x02; 32]);
3791
3792        // Two different storage slots
3793        let mut batch = provider.batch();
3794        batch
3795            .put::<tables::StoragesHistory>(
3796                StorageShardedKey::last(address, slot1),
3797                &BlockNumberList::new_pre_sorted([10u64, 20]),
3798            )
3799            .unwrap();
3800        batch
3801            .put::<tables::StoragesHistory>(
3802                StorageShardedKey::last(address, slot2),
3803                &BlockNumberList::new_pre_sorted([30u64, 40]),
3804            )
3805            .unwrap();
3806        batch.commit().unwrap();
3807
3808        // Prune slot1 to block 20 (deletes all)
3809        let mut batch = provider.batch();
3810        let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
3811        batch.commit().unwrap();
3812
3813        assert_eq!(outcome, PruneShardOutcome::Deleted);
3814
3815        // slot1 should be empty
3816        let shards1 = provider.storage_history_shards(address, slot1).unwrap();
3817        assert!(shards1.is_empty());
3818
3819        // slot2 should be unchanged
3820        let shards2 = provider.storage_history_shards(address, slot2).unwrap();
3821        assert_eq!(shards2.len(), 1);
3822        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
3823    }
3824
3825    #[test]
3826    fn test_prune_invariants() {
3827        // Test invariants: no empty shards, sentinel is always last
3828        let address = Address::from([0x42; 20]);
3829        let storage_key = B256::from([0x01; 32]);
3830
3831        // Test cases that exercise invariants
3832        #[allow(clippy::type_complexity)]
3833        let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
3834            // Account: shards where middle becomes empty
3835            (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
3836            // Account: non-sentinel shard only, partial prune -> must become sentinel
3837            (&[(100, &[50, 100])], 60),
3838        ];
3839
3840        for (initial_shards, prune_to) in invariant_cases {
3841            // Test account history invariants
3842            {
3843                let temp_dir = TempDir::new().unwrap();
3844                let provider =
3845                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3846
3847                let mut batch = provider.batch();
3848                for (highest, blocks) in *initial_shards {
3849                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3850                    batch
3851                        .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3852                        .unwrap();
3853                }
3854                batch.commit().unwrap();
3855
3856                let mut batch = provider.batch();
3857                batch.prune_account_history_to(address, *prune_to).unwrap();
3858                batch.commit().unwrap();
3859
3860                let shards = provider.account_history_shards(address).unwrap();
3861
3862                // Invariant 1: no empty shards
3863                for (key, blocks) in &shards {
3864                    assert!(
3865                        !blocks.is_empty(),
3866                        "Account: empty shard at key {}",
3867                        key.highest_block_number
3868                    );
3869                }
3870
3871                // Invariant 2: last shard is sentinel
3872                if !shards.is_empty() {
3873                    let last = shards.last().unwrap();
3874                    assert_eq!(
3875                        last.0.highest_block_number,
3876                        u64::MAX,
3877                        "Account: last shard must be sentinel"
3878                    );
3879                }
3880            }
3881
3882            // Test storage history invariants
3883            {
3884                let temp_dir = TempDir::new().unwrap();
3885                let provider =
3886                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3887
3888                let mut batch = provider.batch();
3889                for (highest, blocks) in *initial_shards {
3890                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3891                    let key = if *highest == u64::MAX {
3892                        StorageShardedKey::last(address, storage_key)
3893                    } else {
3894                        StorageShardedKey::new(address, storage_key, *highest)
3895                    };
3896                    batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3897                }
3898                batch.commit().unwrap();
3899
3900                let mut batch = provider.batch();
3901                batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
3902                batch.commit().unwrap();
3903
3904                let shards = provider.storage_history_shards(address, storage_key).unwrap();
3905
3906                // Invariant 1: no empty shards
3907                for (key, blocks) in &shards {
3908                    assert!(
3909                        !blocks.is_empty(),
3910                        "Storage: empty shard at key {}",
3911                        key.sharded_key.highest_block_number
3912                    );
3913                }
3914
3915                // Invariant 2: last shard is sentinel
3916                if !shards.is_empty() {
3917                    let last = shards.last().unwrap();
3918                    assert_eq!(
3919                        last.0.sharded_key.highest_block_number,
3920                        u64::MAX,
3921                        "Storage: last shard must be sentinel"
3922                    );
3923                }
3924            }
3925        }
3926    }
3927
3928    #[test]
3929    fn test_prune_account_history_batch_multiple_sorted_targets() {
3930        let temp_dir = TempDir::new().unwrap();
3931        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3932
3933        let addr1 = Address::from([0x01; 20]);
3934        let addr2 = Address::from([0x02; 20]);
3935        let addr3 = Address::from([0x03; 20]);
3936
3937        // Setup shards for each address
3938        let mut batch = provider.batch();
3939        batch
3940            .put::<tables::AccountsHistory>(
3941                ShardedKey::new(addr1, u64::MAX),
3942                &BlockNumberList::new_pre_sorted([10, 20, 30]),
3943            )
3944            .unwrap();
3945        batch
3946            .put::<tables::AccountsHistory>(
3947                ShardedKey::new(addr2, u64::MAX),
3948                &BlockNumberList::new_pre_sorted([5, 10, 15]),
3949            )
3950            .unwrap();
3951        batch
3952            .put::<tables::AccountsHistory>(
3953                ShardedKey::new(addr3, u64::MAX),
3954                &BlockNumberList::new_pre_sorted([100, 200]),
3955            )
3956            .unwrap();
3957        batch.commit().unwrap();
3958
3959        // Prune all three (sorted by address)
3960        let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
3961        targets.sort_by_key(|(addr, _)| *addr);
3962
3963        let mut batch = provider.batch();
3964        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
3965        batch.commit().unwrap();
3966
3967        // addr1: prune <=15, keep [20, 30] -> updated
3968        // addr2: prune <=10, keep [15] -> updated
3969        // addr3: prune <=50, keep [100, 200] -> unchanged
3970        assert_eq!(outcomes.updated, 2);
3971        assert_eq!(outcomes.unchanged, 1);
3972
3973        let shards1 = provider.account_history_shards(addr1).unwrap();
3974        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
3975
3976        let shards2 = provider.account_history_shards(addr2).unwrap();
3977        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
3978
3979        let shards3 = provider.account_history_shards(addr3).unwrap();
3980        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
3981    }
3982
3983    #[test]
3984    fn test_prune_account_history_batch_target_with_no_shards() {
3985        let temp_dir = TempDir::new().unwrap();
3986        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3987
3988        let addr1 = Address::from([0x01; 20]);
3989        let addr2 = Address::from([0x02; 20]); // No shards for this one
3990        let addr3 = Address::from([0x03; 20]);
3991
3992        // Only setup shards for addr1 and addr3
3993        let mut batch = provider.batch();
3994        batch
3995            .put::<tables::AccountsHistory>(
3996                ShardedKey::new(addr1, u64::MAX),
3997                &BlockNumberList::new_pre_sorted([10, 20]),
3998            )
3999            .unwrap();
4000        batch
4001            .put::<tables::AccountsHistory>(
4002                ShardedKey::new(addr3, u64::MAX),
4003                &BlockNumberList::new_pre_sorted([30, 40]),
4004            )
4005            .unwrap();
4006        batch.commit().unwrap();
4007
4008        // Prune all three (addr2 has no shards - tests p > target_prefix case)
4009        let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4010        targets.sort_by_key(|(addr, _)| *addr);
4011
4012        let mut batch = provider.batch();
4013        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4014        batch.commit().unwrap();
4015
4016        // addr1: updated (keep [20])
4017        // addr2: unchanged (no shards)
4018        // addr3: updated (keep [40])
4019        assert_eq!(outcomes.updated, 2);
4020        assert_eq!(outcomes.unchanged, 1);
4021
4022        let shards1 = provider.account_history_shards(addr1).unwrap();
4023        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4024
4025        let shards3 = provider.account_history_shards(addr3).unwrap();
4026        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4027    }
4028
4029    #[test]
4030    fn test_prune_storage_history_batch_multiple_sorted_targets() {
4031        let temp_dir = TempDir::new().unwrap();
4032        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4033
4034        let addr = Address::from([0x42; 20]);
4035        let slot1 = B256::from([0x01; 32]);
4036        let slot2 = B256::from([0x02; 32]);
4037
4038        // Setup shards
4039        let mut batch = provider.batch();
4040        batch
4041            .put::<tables::StoragesHistory>(
4042                StorageShardedKey::new(addr, slot1, u64::MAX),
4043                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4044            )
4045            .unwrap();
4046        batch
4047            .put::<tables::StoragesHistory>(
4048                StorageShardedKey::new(addr, slot2, u64::MAX),
4049                &BlockNumberList::new_pre_sorted([5, 15, 25]),
4050            )
4051            .unwrap();
4052        batch.commit().unwrap();
4053
4054        // Prune both (sorted)
4055        let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4056        targets.sort_by_key(|((a, s), _)| (*a, *s));
4057
4058        let mut batch = provider.batch();
4059        let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4060        batch.commit().unwrap();
4061
4062        assert_eq!(outcomes.updated, 2);
4063
4064        let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4065        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4066
4067        let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4068        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4069    }
4070}