Skip to main content

reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5    keccak256,
6    map::{AddressMap, HashMap},
7    Address, BlockNumber, TxNumber, B256,
8};
9use itertools::Itertools;
10use metrics::Label;
11use parking_lot::Mutex;
12use reth_chain_state::ExecutedBlock;
13use reth_db_api::{
14    database_metrics::DatabaseMetrics,
15    models::{
16        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
17        StorageSettings,
18    },
19    table::{Compress, Decode, Decompress, Encode, Table},
20    tables, BlockNumberList, DatabaseError,
21};
22use reth_primitives_traits::BlockBody as _;
23use reth_prune_types::PruneMode;
24use reth_storage_errors::{
25    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
26    provider::{ProviderError, ProviderResult},
27};
28use rocksdb::{
29    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
30    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
31    OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
32    DB,
33};
34use std::{
35    collections::BTreeMap,
36    fmt,
37    path::{Path, PathBuf},
38    sync::Arc,
39    time::Instant,
40};
41use tracing::instrument;
42
43/// Pending `RocksDB` batches type alias.
44pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
45
46/// Raw key-value result from a `RocksDB` iterator.
47type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
48
49/// Statistics for a single `RocksDB` table (column family).
50#[derive(Debug, Clone)]
51pub struct RocksDBTableStats {
52    /// Size of SST files on disk in bytes.
53    pub sst_size_bytes: u64,
54    /// Size of memtables in memory in bytes.
55    pub memtable_size_bytes: u64,
56    /// Name of the table/column family.
57    pub name: String,
58    /// Estimated number of keys in the table.
59    pub estimated_num_keys: u64,
60    /// Estimated size of live data in bytes (SST files + memtables).
61    pub estimated_size_bytes: u64,
62    /// Estimated bytes pending compaction (reclaimable space).
63    pub pending_compaction_bytes: u64,
64}
65
66/// Database-level statistics for `RocksDB`.
67///
68/// Contains both per-table statistics and DB-level metrics like WAL size.
69#[derive(Debug, Clone)]
70pub struct RocksDBStats {
71    /// Statistics for each table (column family).
72    pub tables: Vec<RocksDBTableStats>,
73    /// Total size of WAL (Write-Ahead Log) files in bytes.
74    ///
75    /// WAL is shared across all tables and not included in per-table metrics.
76    pub wal_size_bytes: u64,
77}
78
79/// Context for `RocksDB` block writes.
80#[derive(Clone)]
81pub(crate) struct RocksDBWriteCtx {
82    /// The first block number being written.
83    pub first_block_number: BlockNumber,
84    /// The prune mode for transaction lookup, if any.
85    pub prune_tx_lookup: Option<PruneMode>,
86    /// Storage settings determining what goes to `RocksDB`.
87    pub storage_settings: StorageSettings,
88    /// Pending batches to push to after writing.
89    pub pending_batches: PendingRocksDBBatches,
90}
91
92impl fmt::Debug for RocksDBWriteCtx {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.debug_struct("RocksDBWriteCtx")
95            .field("first_block_number", &self.first_block_number)
96            .field("prune_tx_lookup", &self.prune_tx_lookup)
97            .field("storage_settings", &self.storage_settings)
98            .field("pending_batches", &"<pending batches>")
99            .finish()
100    }
101}
102
103/// Default cache size for `RocksDB` block cache (128 MB).
104const DEFAULT_CACHE_SIZE: usize = 128 << 20;
105
106/// Default block size for `RocksDB` tables (16 KB).
107const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
108
109/// Default max background jobs for `RocksDB` compaction and flushing.
110const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
111
112/// Default max open file descriptors for `RocksDB`.
113///
114/// Caps the number of SST file handles `RocksDB` keeps open simultaneously.
115/// Set to 512 to stay within the common default OS `ulimit -n` of 1024,
116/// leaving headroom for MDBX, static files, and other I/O.
117/// `RocksDB` uses an internal table cache and re-opens files on demand,
118/// so this has negligible performance impact on read-heavy workloads.
119const DEFAULT_MAX_OPEN_FILES: i32 = 512;
120
121/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
122const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
123
124/// Default write buffer size for `RocksDB` memtables (128 MB).
125///
126/// Larger memtables reduce flush frequency during burst writes, providing more consistent
127/// tail latency. Benchmarks showed 128 MB reduces p99 latency variance by ~80% compared
128/// to 64 MB default, with negligible impact on mean throughput.
129const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
130
131/// Default buffer capacity for compression in batches.
132/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
133/// reducing the first few reallocations without over-allocating.
134const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
135
136/// Default auto-commit threshold for batch writes (4 GiB).
137///
138/// When a batch exceeds this size, it is automatically committed to prevent OOM
139/// during large bulk writes. The consistency check on startup heals any crash
140/// that occurs between auto-commits.
141const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
142
143/// Builder for [`RocksDBProvider`].
144pub struct RocksDBBuilder {
145    path: PathBuf,
146    column_families: Vec<String>,
147    enable_metrics: bool,
148    enable_statistics: bool,
149    log_level: rocksdb::LogLevel,
150    block_cache: Cache,
151    read_only: bool,
152}
153
154impl fmt::Debug for RocksDBBuilder {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        f.debug_struct("RocksDBBuilder")
157            .field("path", &self.path)
158            .field("column_families", &self.column_families)
159            .field("enable_metrics", &self.enable_metrics)
160            .finish()
161    }
162}
163
164impl RocksDBBuilder {
165    /// Creates a new builder with optimized default options.
166    pub fn new(path: impl AsRef<Path>) -> Self {
167        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
168        Self {
169            path: path.as_ref().to_path_buf(),
170            column_families: Vec::new(),
171            enable_metrics: false,
172            enable_statistics: false,
173            log_level: rocksdb::LogLevel::Info,
174            block_cache: cache,
175            read_only: false,
176        }
177    }
178
179    /// Creates default table options with shared block cache.
180    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
181        let mut table_options = BlockBasedOptions::default();
182        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
183        table_options.set_cache_index_and_filter_blocks(true);
184        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
185        // Shared block cache for all column families.
186        table_options.set_block_cache(cache);
187        table_options
188    }
189
190    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
191    fn default_options(
192        log_level: rocksdb::LogLevel,
193        cache: &Cache,
194        enable_statistics: bool,
195    ) -> Options {
196        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
197        let table_options = Self::default_table_options(cache);
198
199        let mut options = Options::default();
200        options.set_block_based_table_factory(&table_options);
201        options.create_if_missing(true);
202        options.create_missing_column_families(true);
203        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
204        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
205
206        options.set_bottommost_compression_type(DBCompressionType::Zstd);
207        options.set_bottommost_zstd_max_train_bytes(0, true);
208        options.set_compression_type(DBCompressionType::Lz4);
209        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
210
211        options.set_log_level(log_level);
212
213        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
214
215        // Delete obsolete WAL files immediately after all column families have flushed.
216        // Both set to 0 means "delete ASAP, no archival".
217        options.set_wal_ttl_seconds(0);
218        options.set_wal_size_limit_mb(0);
219
220        // Statistics can view from RocksDB log file
221        if enable_statistics {
222            options.enable_statistics();
223        }
224
225        options
226    }
227
228    /// Creates optimized column family options.
229    fn default_column_family_options(cache: &Cache) -> Options {
230        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
231        let table_options = Self::default_table_options(cache);
232
233        let mut cf_options = Options::default();
234        cf_options.set_block_based_table_factory(&table_options);
235        cf_options.set_level_compaction_dynamic_level_bytes(true);
236        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
237        cf_options.set_compression_type(DBCompressionType::Lz4);
238        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
239        // Only use Zstd compression, disable dictionary training
240        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
241        cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
242
243        cf_options
244    }
245
246    /// Creates optimized column family options for `TransactionHashNumbers`.
247    ///
248    /// This table stores `B256 -> TxNumber` mappings where:
249    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
250    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
251    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
252    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
253        let mut table_options = BlockBasedOptions::default();
254        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
255        table_options.set_cache_index_and_filter_blocks(true);
256        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
257        table_options.set_block_cache(cache);
258        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
259        // and waste memory
260
261        let mut cf_options = Options::default();
262        cf_options.set_block_based_table_factory(&table_options);
263        cf_options.set_level_compaction_dynamic_level_bytes(true);
264        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
265        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
266        cf_options.set_compression_type(DBCompressionType::None);
267        cf_options.set_bottommost_compression_type(DBCompressionType::None);
268
269        cf_options
270    }
271
272    /// Adds a column family for a specific table type.
273    pub fn with_table<T: Table>(mut self) -> Self {
274        self.column_families.push(T::NAME.to_string());
275        self
276    }
277
278    /// Registers the default tables used by reth for `RocksDB` storage.
279    ///
280    /// This registers:
281    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
282    /// - [`tables::AccountsHistory`] - Account history index
283    /// - [`tables::StoragesHistory`] - Storage history index
284    pub fn with_default_tables(self) -> Self {
285        self.with_table::<tables::TransactionHashNumbers>()
286            .with_table::<tables::AccountsHistory>()
287            .with_table::<tables::StoragesHistory>()
288    }
289
290    /// Enables metrics.
291    pub const fn with_metrics(mut self) -> Self {
292        self.enable_metrics = true;
293        self
294    }
295
296    /// Enables `RocksDB` internal statistics collection.
297    pub const fn with_statistics(mut self) -> Self {
298        self.enable_statistics = true;
299        self
300    }
301
302    /// Sets the log level from `DatabaseArgs` configuration.
303    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
304        if let Some(level) = log_level {
305            self.log_level = convert_log_level(level);
306        }
307        self
308    }
309
310    /// Sets a custom block cache size.
311    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
312        self.block_cache = Cache::new_lru_cache(capacity_bytes);
313        self
314    }
315
316    /// Sets read-only mode.
317    ///
318    /// Note: Write operations on a read-only provider will panic at runtime.
319    pub const fn with_read_only(mut self, read_only: bool) -> Self {
320        self.read_only = read_only;
321        self
322    }
323
324    /// Builds the [`RocksDBProvider`].
325    pub fn build(self) -> ProviderResult<RocksDBProvider> {
326        let options =
327            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
328
329        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
330            .column_families
331            .iter()
332            .map(|name| {
333                let cf_options = if name == tables::TransactionHashNumbers::NAME {
334                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
335                } else {
336                    Self::default_column_family_options(&self.block_cache)
337                };
338                ColumnFamilyDescriptor::new(name.clone(), cf_options)
339            })
340            .collect();
341
342        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
343
344        if self.read_only {
345            let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
346                .map_err(|e| {
347                    ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
348                        message: e.to_string().into(),
349                        code: -1,
350                    }))
351                })?;
352            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
353        } else {
354            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
355            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
356            // detection at commit) and is backed by DBCommon, giving us access to
357            // cancel_all_background_work for clean shutdown.
358            let db =
359                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
360                    .map_err(|e| {
361                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
362                            message: e.to_string().into(),
363                            code: -1,
364                        }))
365                    })?;
366            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
367        }
368    }
369}
370
371/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
372/// allocated buffer when we can just use their reference.
373macro_rules! compress_to_buf_or_ref {
374    ($buf:expr, $value:expr) => {
375        if let Some(value) = $value.uncompressable_ref() {
376            Some(value)
377        } else {
378            $buf.clear();
379            $value.compress_to_buf(&mut $buf);
380            None
381        }
382    };
383}
384
385/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
386#[derive(Debug)]
387pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
388
389/// Inner state for `RocksDB` provider.
390enum RocksDBProviderInner {
391    /// Read-write mode using `OptimisticTransactionDB`.
392    ReadWrite {
393        /// `RocksDB` database instance with optimistic transaction support.
394        db: OptimisticTransactionDB,
395        /// Metrics latency & operations.
396        metrics: Option<RocksDBMetrics>,
397    },
398    /// Read-only mode using `DB` opened with `open_cf_descriptors_read_only`.
399    /// This doesn't acquire an exclusive lock, allowing concurrent reads.
400    ReadOnly {
401        /// Read-only `RocksDB` database instance.
402        db: DB,
403        /// Metrics latency & operations.
404        metrics: Option<RocksDBMetrics>,
405    },
406}
407
408impl RocksDBProviderInner {
409    /// Returns the metrics for this provider.
410    const fn metrics(&self) -> Option<&RocksDBMetrics> {
411        match self {
412            Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
413        }
414    }
415
416    /// Returns the read-write database, panicking if in read-only mode.
417    fn db_rw(&self) -> &OptimisticTransactionDB {
418        match self {
419            Self::ReadWrite { db, .. } => db,
420            Self::ReadOnly { .. } => {
421                panic!("Cannot perform write operation on read-only RocksDB provider")
422            }
423        }
424    }
425
426    /// Gets the column family handle for a table.
427    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
428        let cf = match self {
429            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
430            Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
431        };
432        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
433    }
434
435    /// Gets the column family handle for a table from the read-write database.
436    ///
437    /// # Panics
438    /// Panics if in read-only mode.
439    fn cf_handle_rw(&self, name: &str) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
440        self.db_rw()
441            .cf_handle(name)
442            .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", name)))
443    }
444
445    /// Gets a value from a column family.
446    fn get_cf(
447        &self,
448        cf: &rocksdb::ColumnFamily,
449        key: impl AsRef<[u8]>,
450    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
451        match self {
452            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
453            Self::ReadOnly { db, .. } => db.get_cf(cf, key),
454        }
455    }
456
457    /// Puts a value into a column family.
458    fn put_cf(
459        &self,
460        cf: &rocksdb::ColumnFamily,
461        key: impl AsRef<[u8]>,
462        value: impl AsRef<[u8]>,
463    ) -> Result<(), rocksdb::Error> {
464        self.db_rw().put_cf(cf, key, value)
465    }
466
467    /// Deletes a value from a column family.
468    fn delete_cf(
469        &self,
470        cf: &rocksdb::ColumnFamily,
471        key: impl AsRef<[u8]>,
472    ) -> Result<(), rocksdb::Error> {
473        self.db_rw().delete_cf(cf, key)
474    }
475
476    /// Deletes a range of values from a column family.
477    fn delete_range_cf<K: AsRef<[u8]>>(
478        &self,
479        cf: &rocksdb::ColumnFamily,
480        from: K,
481        to: K,
482    ) -> Result<(), rocksdb::Error> {
483        self.db_rw().delete_range_cf(cf, from, to)
484    }
485
486    /// Returns an iterator over a column family.
487    fn iterator_cf(
488        &self,
489        cf: &rocksdb::ColumnFamily,
490        mode: IteratorMode<'_>,
491    ) -> RocksDBIterEnum<'_> {
492        match self {
493            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
494            Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
495        }
496    }
497
498    /// Returns a raw iterator over a column family.
499    ///
500    /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
501    /// repositioning without creating a new iterator.
502    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
503        match self {
504            Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
505            Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
506        }
507    }
508
509    /// Returns the path to the database directory.
510    fn path(&self) -> &Path {
511        match self {
512            Self::ReadWrite { db, .. } => db.path(),
513            Self::ReadOnly { db, .. } => db.path(),
514        }
515    }
516
517    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
518    ///
519    /// WAL files have a `.log` extension in the `RocksDB` directory.
520    fn wal_size_bytes(&self) -> u64 {
521        let path = self.path();
522
523        match std::fs::read_dir(path) {
524            Ok(entries) => entries
525                .filter_map(|e| e.ok())
526                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
527                .filter_map(|e| e.metadata().ok())
528                .map(|m| m.len())
529                .sum(),
530            Err(_) => 0,
531        }
532    }
533
534    /// Returns statistics for all column families in the database.
535    fn table_stats(&self) -> Vec<RocksDBTableStats> {
536        let mut stats = Vec::new();
537
538        macro_rules! collect_stats {
539            ($db:expr) => {
540                for cf_name in ROCKSDB_TABLES {
541                    if let Some(cf) = $db.cf_handle(cf_name) {
542                        let estimated_num_keys = $db
543                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
544                            .ok()
545                            .flatten()
546                            .unwrap_or(0);
547
548                        // SST files size (on-disk) + memtable size (in-memory)
549                        let sst_size = $db
550                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
551                            .ok()
552                            .flatten()
553                            .unwrap_or(0);
554
555                        let memtable_size = $db
556                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
557                            .ok()
558                            .flatten()
559                            .unwrap_or(0);
560
561                        let estimated_size_bytes = sst_size + memtable_size;
562
563                        let pending_compaction_bytes = $db
564                            .property_int_value_cf(
565                                cf,
566                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
567                            )
568                            .ok()
569                            .flatten()
570                            .unwrap_or(0);
571
572                        stats.push(RocksDBTableStats {
573                            sst_size_bytes: sst_size,
574                            memtable_size_bytes: memtable_size,
575                            name: cf_name.to_string(),
576                            estimated_num_keys,
577                            estimated_size_bytes,
578                            pending_compaction_bytes,
579                        });
580                    }
581                }
582            };
583        }
584
585        match self {
586            Self::ReadWrite { db, .. } => collect_stats!(db),
587            Self::ReadOnly { db, .. } => collect_stats!(db),
588        }
589
590        stats
591    }
592
593    /// Returns database-level statistics including per-table stats and WAL size.
594    fn db_stats(&self) -> RocksDBStats {
595        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
596    }
597}
598
599impl fmt::Debug for RocksDBProviderInner {
600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601        match self {
602            Self::ReadWrite { metrics, .. } => f
603                .debug_struct("RocksDBProviderInner::ReadWrite")
604                .field("db", &"<OptimisticTransactionDB>")
605                .field("metrics", metrics)
606                .finish(),
607            Self::ReadOnly { metrics, .. } => f
608                .debug_struct("RocksDBProviderInner::ReadOnly")
609                .field("db", &"<DB (read-only)>")
610                .field("metrics", metrics)
611                .finish(),
612        }
613    }
614}
615
616impl Drop for RocksDBProviderInner {
617    fn drop(&mut self) {
618        match self {
619            Self::ReadWrite { db, .. } => {
620                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
621                // restart
622                if let Err(e) = db.flush_wal(true) {
623                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
624                }
625                for cf_name in ROCKSDB_TABLES {
626                    if let Some(cf) = db.cf_handle(cf_name) &&
627                        let Err(e) = db.flush_cf(&cf)
628                    {
629                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
630                    }
631                }
632                db.cancel_all_background_work(true);
633            }
634            Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
635        }
636    }
637}
638
639impl Clone for RocksDBProvider {
640    fn clone(&self) -> Self {
641        Self(self.0.clone())
642    }
643}
644
645impl DatabaseMetrics for RocksDBProvider {
646    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
647        let mut metrics = Vec::new();
648
649        for stat in self.table_stats() {
650            metrics.push((
651                "rocksdb.table_size",
652                stat.estimated_size_bytes as f64,
653                vec![Label::new("table", stat.name.clone())],
654            ));
655            metrics.push((
656                "rocksdb.table_entries",
657                stat.estimated_num_keys as f64,
658                vec![Label::new("table", stat.name.clone())],
659            ));
660            metrics.push((
661                "rocksdb.pending_compaction_bytes",
662                stat.pending_compaction_bytes as f64,
663                vec![Label::new("table", stat.name.clone())],
664            ));
665            metrics.push((
666                "rocksdb.sst_size",
667                stat.sst_size_bytes as f64,
668                vec![Label::new("table", stat.name.clone())],
669            ));
670            metrics.push((
671                "rocksdb.memtable_size",
672                stat.memtable_size_bytes as f64,
673                vec![Label::new("table", stat.name)],
674            ));
675        }
676
677        // WAL size (DB-level, shared across all tables)
678        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
679
680        metrics
681    }
682}
683
684impl RocksDBProvider {
685    /// Creates a new `RocksDB` provider.
686    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
687        RocksDBBuilder::new(path).build()
688    }
689
690    /// Creates a new `RocksDB` provider builder.
691    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
692        RocksDBBuilder::new(path)
693    }
694
695    /// Returns `true` if this provider is in read-only mode.
696    pub fn is_read_only(&self) -> bool {
697        matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
698    }
699
700    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
701    ///
702    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
703    /// Conflict detection happens at commit time, not at write time.
704    ///
705    /// # Panics
706    /// Panics if the provider is in read-only mode.
707    pub fn tx(&self) -> RocksTx<'_> {
708        let write_options = WriteOptions::default();
709        let txn_options = OptimisticTransactionOptions::default();
710        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
711        RocksTx { inner, provider: self }
712    }
713
714    /// Creates a new batch for atomic writes.
715    ///
716    /// Use [`Self::write_batch`] for closure-based atomic writes.
717    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
718    ///
719    /// # Panics
720    /// Panics if the provider is in read-only mode when attempting to commit.
721    pub fn batch(&self) -> RocksDBBatch<'_> {
722        RocksDBBatch {
723            provider: self,
724            inner: WriteBatchWithTransaction::<true>::default(),
725            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
726            auto_commit_threshold: None,
727        }
728    }
729
730    /// Creates a new batch with auto-commit enabled.
731    ///
732    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
733    /// committed and reset. This prevents OOM during large bulk writes while maintaining
734    /// crash-safety via the consistency check on startup.
735    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
736        RocksDBBatch {
737            provider: self,
738            inner: WriteBatchWithTransaction::<true>::default(),
739            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
740            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
741        }
742    }
743
744    /// Gets the column family handle for a table.
745    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
746        self.0.cf_handle::<T>()
747    }
748
749    /// Executes a function and records metrics with the given operation and table name.
750    fn execute_with_operation_metric<R>(
751        &self,
752        operation: RocksDBOperation,
753        table: &'static str,
754        f: impl FnOnce(&Self) -> R,
755    ) -> R {
756        let start = self.0.metrics().map(|_| Instant::now());
757        let res = f(self);
758
759        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
760            metrics.record_operation(operation, table, start.elapsed());
761        }
762
763        res
764    }
765
766    /// Gets a value from the specified table.
767    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
768        self.get_encoded::<T>(&key.encode())
769    }
770
771    /// Gets a value from the specified table using pre-encoded key.
772    pub fn get_encoded<T: Table>(
773        &self,
774        key: &<T::Key as Encode>::Encoded,
775    ) -> ProviderResult<Option<T::Value>> {
776        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
777            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
778                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
779                    message: e.to_string().into(),
780                    code: -1,
781                }))
782            })?;
783
784            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
785        })
786    }
787
788    /// Puts upsert a value into the specified table with the given key.
789    ///
790    /// # Panics
791    /// Panics if the provider is in read-only mode.
792    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
793        let encoded_key = key.encode();
794        self.put_encoded::<T>(&encoded_key, value)
795    }
796
797    /// Puts a value into the specified table using pre-encoded key.
798    ///
799    /// # Panics
800    /// Panics if the provider is in read-only mode.
801    pub fn put_encoded<T: Table>(
802        &self,
803        key: &<T::Key as Encode>::Encoded,
804        value: &T::Value,
805    ) -> ProviderResult<()> {
806        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
807            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
808            // is thread safe if user want to avoid allocate buf each time, they can use
809            // write_batch api
810            let mut buf = Vec::new();
811            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
812
813            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
814                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
815                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
816                    operation: DatabaseWriteOperation::PutUpsert,
817                    table_name: T::NAME,
818                    key: key.as_ref().to_vec(),
819                })))
820            })
821        })
822    }
823
824    /// Deletes a value from the specified table.
825    ///
826    /// # Panics
827    /// Panics if the provider is in read-only mode.
828    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
829        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
830            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
831                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
832                    message: e.to_string().into(),
833                    code: -1,
834                }))
835            })
836        })
837    }
838
839    /// Clears all entries from the specified table.
840    ///
841    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
842    /// This end key must exceed the maximum encoded key size for any table.
843    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
844    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
845        let cf = self.get_cf_handle::<T>()?;
846
847        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
848            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
849                message: e.to_string().into(),
850                code: -1,
851            }))
852        })?;
853
854        Ok(())
855    }
856
857    /// Retrieves the first or last entry from a table based on the iterator mode.
858    fn get_boundary<T: Table>(
859        &self,
860        mode: IteratorMode<'_>,
861    ) -> ProviderResult<Option<(T::Key, T::Value)>> {
862        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
863            let cf = this.get_cf_handle::<T>()?;
864            let mut iter = this.0.iterator_cf(cf, mode);
865
866            match iter.next() {
867                Some(Ok((key_bytes, value_bytes))) => {
868                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
869                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
870                    let value = T::Value::decompress(&value_bytes)
871                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
872                    Ok(Some((key, value)))
873                }
874                Some(Err(e)) => {
875                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
876                        message: e.to_string().into(),
877                        code: -1,
878                    })))
879                }
880                None => Ok(None),
881            }
882        })
883    }
884
885    /// Gets the first (smallest key) entry from the specified table.
886    #[inline]
887    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
888        self.get_boundary::<T>(IteratorMode::Start)
889    }
890
891    /// Gets the last (largest key) entry from the specified table.
892    #[inline]
893    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
894        self.get_boundary::<T>(IteratorMode::End)
895    }
896
897    /// Creates an iterator over all entries in the specified table.
898    ///
899    /// Returns decoded `(Key, Value)` pairs in key order.
900    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
901        let cf = self.get_cf_handle::<T>()?;
902        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
903        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
904    }
905
906    /// Returns statistics for all column families in the database.
907    ///
908    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
909    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
910        self.0.table_stats()
911    }
912
913    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
914    ///
915    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
916    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
917    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
918    pub fn wal_size_bytes(&self) -> u64 {
919        self.0.wal_size_bytes()
920    }
921
922    /// Returns database-level statistics including per-table stats and WAL size.
923    ///
924    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
925    pub fn db_stats(&self) -> RocksDBStats {
926        self.0.db_stats()
927    }
928
929    /// Flushes pending writes for the specified tables to disk.
930    ///
931    /// This performs a flush of:
932    /// 1. The column family memtables for the specified table names to SST files
933    /// 2. The Write-Ahead Log (WAL) with sync
934    ///
935    /// After this call completes, all data for the specified tables is durably persisted to disk.
936    ///
937    /// # Panics
938    /// Panics if the provider is in read-only mode.
939    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
940    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
941        let db = self.0.db_rw();
942
943        for cf_name in tables {
944            if let Some(cf) = db.cf_handle(cf_name) {
945                db.flush_cf(&cf).map_err(|e| {
946                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
947                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
948                        operation: DatabaseWriteOperation::Flush,
949                        table_name: cf_name,
950                        key: Vec::new(),
951                    })))
952                })?;
953            }
954        }
955
956        db.flush_wal(true).map_err(|e| {
957            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
958                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
959                operation: DatabaseWriteOperation::Flush,
960                table_name: "WAL",
961                key: Vec::new(),
962            })))
963        })?;
964
965        Ok(())
966    }
967
968    /// Flushes and compacts all tables in `RocksDB`.
969    ///
970    /// This:
971    /// 1. Flushes all column family memtables to SST files
972    /// 2. Flushes the Write-Ahead Log (WAL) with sync
973    /// 3. Triggers manual compaction on all column families to reclaim disk space
974    ///
975    /// Use this after large delete operations (like pruning) to reclaim disk space.
976    ///
977    /// # Panics
978    /// Panics if the provider is in read-only mode.
979    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
980    pub fn flush_and_compact(&self) -> ProviderResult<()> {
981        self.flush(ROCKSDB_TABLES)?;
982
983        let db = self.0.db_rw();
984
985        for cf_name in ROCKSDB_TABLES {
986            if let Some(cf) = db.cf_handle(cf_name) {
987                db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
988            }
989        }
990
991        Ok(())
992    }
993
994    /// Creates a raw iterator over all entries in the specified table.
995    ///
996    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
997    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
998        let cf = self.get_cf_handle::<T>()?;
999        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1000        Ok(RocksDBRawIter { inner: iter })
1001    }
1002
1003    /// Returns all account history shards for the given address in ascending key order.
1004    ///
1005    /// This is used for unwind operations where we need to scan all shards for an address
1006    /// and potentially delete or truncate them.
1007    pub fn account_history_shards(
1008        &self,
1009        address: Address,
1010    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1011        // Get the column family handle for the AccountsHistory table.
1012        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1013
1014        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
1015        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
1016        let start_key = ShardedKey::new(address, 0u64);
1017        let start_bytes = start_key.encode();
1018
1019        // Create a forward iterator starting from our seek position.
1020        let iter = self
1021            .0
1022            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1023
1024        let mut result = Vec::new();
1025        for item in iter {
1026            match item {
1027                Ok((key_bytes, value_bytes)) => {
1028                    // Decode the sharded key to check if we're still on the same address.
1029                    let key = ShardedKey::<Address>::decode(&key_bytes)
1030                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1031
1032                    // Stop when we reach a different address (keys are sorted by address first).
1033                    if key.key != address {
1034                        break;
1035                    }
1036
1037                    // Decompress the block number list stored in this shard.
1038                    let value = BlockNumberList::decompress(&value_bytes)
1039                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1040
1041                    result.push((key, value));
1042                }
1043                Err(e) => {
1044                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1045                        message: e.to_string().into(),
1046                        code: -1,
1047                    })));
1048                }
1049            }
1050        }
1051
1052        Ok(result)
1053    }
1054
1055    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1056    ///
1057    /// Iterates through all shards in ascending `highest_block_number` order until
1058    /// a different `(address, storage_key)` is encountered.
1059    pub fn storage_history_shards(
1060        &self,
1061        address: Address,
1062        storage_key: B256,
1063    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1064        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1065
1066        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1067        let start_bytes = start_key.encode();
1068
1069        let iter = self
1070            .0
1071            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1072
1073        let mut result = Vec::new();
1074        for item in iter {
1075            match item {
1076                Ok((key_bytes, value_bytes)) => {
1077                    let key = StorageShardedKey::decode(&key_bytes)
1078                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1079
1080                    if key.address != address || key.sharded_key.key != storage_key {
1081                        break;
1082                    }
1083
1084                    let value = BlockNumberList::decompress(&value_bytes)
1085                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1086
1087                    result.push((key, value));
1088                }
1089                Err(e) => {
1090                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1091                        message: e.to_string().into(),
1092                        code: -1,
1093                    })));
1094                }
1095            }
1096        }
1097
1098        Ok(result)
1099    }
1100
1101    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1102    ///
1103    /// Groups addresses by their minimum block number and calls the appropriate unwind
1104    /// operations. For each address, keeps only blocks less than the minimum block
1105    /// (i.e., removes the minimum block and all higher blocks).
1106    ///
1107    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1108    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1109    pub fn unwind_account_history_indices(
1110        &self,
1111        last_indices: &[(Address, BlockNumber)],
1112    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1113        let mut address_min_block: AddressMap<BlockNumber> =
1114            AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1115        for &(address, block_number) in last_indices {
1116            address_min_block
1117                .entry(address)
1118                .and_modify(|min| *min = (*min).min(block_number))
1119                .or_insert(block_number);
1120        }
1121
1122        let mut batch = self.batch();
1123        for (address, min_block) in address_min_block {
1124            match min_block.checked_sub(1) {
1125                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1126                None => batch.clear_account_history(address)?,
1127            }
1128        }
1129
1130        Ok(batch.into_inner())
1131    }
1132
1133    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1134    ///
1135    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1136    /// For each key, keeps only blocks less than the minimum block
1137    /// (i.e., removes the minimum block and all higher blocks).
1138    ///
1139    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1140    pub fn unwind_storage_history_indices(
1141        &self,
1142        storage_changesets: &[(Address, B256, BlockNumber)],
1143    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1144        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1145            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1146        for &(address, storage_key, block_number) in storage_changesets {
1147            key_min_block
1148                .entry((address, storage_key))
1149                .and_modify(|min| *min = (*min).min(block_number))
1150                .or_insert(block_number);
1151        }
1152
1153        let mut batch = self.batch();
1154        for ((address, storage_key), min_block) in key_min_block {
1155            match min_block.checked_sub(1) {
1156                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1157                None => batch.clear_storage_history(address, storage_key)?,
1158            }
1159        }
1160
1161        Ok(batch.into_inner())
1162    }
1163
1164    /// Writes a batch of operations atomically.
1165    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1166    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1167    where
1168        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1169    {
1170        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1171            let mut batch_handle = this.batch();
1172            f(&mut batch_handle)?;
1173            batch_handle.commit()
1174        })
1175    }
1176
1177    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1178    ///
1179    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1180    /// and needs to be committed at a later point (e.g., at provider commit time).
1181    ///
1182    /// # Panics
1183    /// Panics if the provider is in read-only mode.
1184    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1185    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1186        self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1187            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1188                message: e.to_string().into(),
1189                code: -1,
1190            }))
1191        })
1192    }
1193
1194    /// Writes all `RocksDB` data for multiple blocks in parallel.
1195    ///
1196    /// This handles transaction hash numbers, account history, and storage history based on
1197    /// the provided storage settings. Each operation runs in parallel with its own batch,
1198    /// pushing to `ctx.pending_batches` for later commit.
1199    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1200    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1201        &self,
1202        blocks: &[ExecutedBlock<N>],
1203        tx_nums: &[TxNumber],
1204        ctx: RocksDBWriteCtx,
1205        runtime: &reth_tasks::Runtime,
1206    ) -> ProviderResult<()> {
1207        if !ctx.storage_settings.storage_v2 {
1208            return Ok(());
1209        }
1210
1211        let mut r_tx_hash = None;
1212        let mut r_account_history = None;
1213        let mut r_storage_history = None;
1214
1215        let write_tx_hash =
1216            ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1217        let write_account_history = ctx.storage_settings.storage_v2;
1218        let write_storage_history = ctx.storage_settings.storage_v2;
1219
1220        runtime.storage_pool().in_place_scope(|s| {
1221            if write_tx_hash {
1222                s.spawn(|_| {
1223                    r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1224                });
1225            }
1226
1227            if write_account_history {
1228                s.spawn(|_| {
1229                    r_account_history = Some(self.write_account_history(blocks, &ctx));
1230                });
1231            }
1232
1233            if write_storage_history {
1234                s.spawn(|_| {
1235                    r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1236                });
1237            }
1238        });
1239
1240        if write_tx_hash {
1241            r_tx_hash.ok_or_else(|| {
1242                ProviderError::Database(DatabaseError::Other(
1243                    "rocksdb tx-hash write thread panicked".into(),
1244                ))
1245            })??;
1246        }
1247        if write_account_history {
1248            r_account_history.ok_or_else(|| {
1249                ProviderError::Database(DatabaseError::Other(
1250                    "rocksdb account-history write thread panicked".into(),
1251                ))
1252            })??;
1253        }
1254        if write_storage_history {
1255            r_storage_history.ok_or_else(|| {
1256                ProviderError::Database(DatabaseError::Other(
1257                    "rocksdb storage-history write thread panicked".into(),
1258                ))
1259            })??;
1260        }
1261
1262        Ok(())
1263    }
1264
1265    /// Writes transaction hash to number mappings for the given blocks.
1266    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1267    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1268        &self,
1269        blocks: &[ExecutedBlock<N>],
1270        tx_nums: &[TxNumber],
1271        ctx: &RocksDBWriteCtx,
1272    ) -> ProviderResult<()> {
1273        let mut batch = self.batch();
1274        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1275            let body = block.recovered_block().body();
1276            let mut tx_num = first_tx_num;
1277            for transaction in body.transactions_iter() {
1278                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1279                tx_num += 1;
1280            }
1281        }
1282        ctx.pending_batches.lock().push(batch.into_inner());
1283        Ok(())
1284    }
1285
1286    /// Writes account history indices for the given blocks.
1287    ///
1288    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1289    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1290    fn write_account_history<N: reth_node_types::NodePrimitives>(
1291        &self,
1292        blocks: &[ExecutedBlock<N>],
1293        ctx: &RocksDBWriteCtx,
1294    ) -> ProviderResult<()> {
1295        let mut batch = self.batch();
1296        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1297
1298        for (block_idx, block) in blocks.iter().enumerate() {
1299            let block_number = ctx.first_block_number + block_idx as u64;
1300            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1301
1302            // Iterate through account reverts - these are exactly the accounts that have
1303            // changesets written, ensuring history indices match changeset entries.
1304            for account_block_reverts in reverts.accounts {
1305                for (address, _) in account_block_reverts {
1306                    account_history.entry(address).or_default().push(block_number);
1307                }
1308            }
1309        }
1310
1311        // Write account history using proper shard append logic
1312        for (address, indices) in account_history {
1313            batch.append_account_history_shard(address, indices)?;
1314        }
1315        ctx.pending_batches.lock().push(batch.into_inner());
1316        Ok(())
1317    }
1318
1319    /// Writes storage history indices for the given blocks.
1320    ///
1321    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1322    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1323    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1324        &self,
1325        blocks: &[ExecutedBlock<N>],
1326        ctx: &RocksDBWriteCtx,
1327    ) -> ProviderResult<()> {
1328        let mut batch = self.batch();
1329        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1330
1331        for (block_idx, block) in blocks.iter().enumerate() {
1332            let block_number = ctx.first_block_number + block_idx as u64;
1333            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1334
1335            // Iterate through storage reverts - these are exactly the slots that have
1336            // changesets written, ensuring history indices match changeset entries.
1337            for storage_block_reverts in reverts.storage {
1338                for revert in storage_block_reverts {
1339                    for (slot, _) in revert.storage_revert {
1340                        let plain_key = B256::new(slot.to_be_bytes());
1341                        let key = keccak256(plain_key);
1342                        storage_history
1343                            .entry((revert.address, key))
1344                            .or_default()
1345                            .push(block_number);
1346                    }
1347                }
1348            }
1349        }
1350
1351        // Write storage history using proper shard append logic
1352        for ((address, slot), indices) in storage_history {
1353            batch.append_storage_history_shard(address, slot, indices)?;
1354        }
1355        ctx.pending_batches.lock().push(batch.into_inner());
1356        Ok(())
1357    }
1358}
1359
1360/// Outcome of pruning a history shard in `RocksDB`.
1361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1362pub enum PruneShardOutcome {
1363    /// Shard was deleted entirely.
1364    Deleted,
1365    /// Shard was updated with filtered block numbers.
1366    Updated,
1367    /// Shard was unchanged (no blocks <= `to_block`).
1368    Unchanged,
1369}
1370
1371/// Tracks pruning outcomes for batch operations.
1372#[derive(Debug, Default, Clone, Copy)]
1373pub struct PrunedIndices {
1374    /// Number of shards completely deleted.
1375    pub deleted: usize,
1376    /// Number of shards that were updated (filtered but still have entries).
1377    pub updated: usize,
1378    /// Number of shards that were unchanged.
1379    pub unchanged: usize,
1380}
1381
1382/// Handle for building a batch of operations atomically.
1383///
1384/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1385/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1386/// where you don't need to read back uncommitted data within the same operation
1387/// (e.g., history index writes).
1388///
1389/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1390/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1391#[must_use = "batch must be committed"]
1392pub struct RocksDBBatch<'a> {
1393    provider: &'a RocksDBProvider,
1394    inner: WriteBatchWithTransaction<true>,
1395    buf: Vec<u8>,
1396    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1397    auto_commit_threshold: Option<usize>,
1398}
1399
1400impl fmt::Debug for RocksDBBatch<'_> {
1401    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1402        f.debug_struct("RocksDBBatch")
1403            .field("provider", &self.provider)
1404            .field("batch", &"<WriteBatchWithTransaction>")
1405            // Number of operations in this batch
1406            .field("length", &self.inner.len())
1407            // Total serialized size (encoded key + compressed value + metadata) of this batch
1408            // in bytes
1409            .field("size_in_bytes", &self.inner.size_in_bytes())
1410            .finish()
1411    }
1412}
1413
1414impl<'a> RocksDBBatch<'a> {
1415    /// Puts a value into the batch.
1416    ///
1417    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1418    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1419        let encoded_key = key.encode();
1420        self.put_encoded::<T>(&encoded_key, value)
1421    }
1422
1423    /// Puts a value into the batch using pre-encoded key.
1424    ///
1425    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1426    pub fn put_encoded<T: Table>(
1427        &mut self,
1428        key: &<T::Key as Encode>::Encoded,
1429        value: &T::Value,
1430    ) -> ProviderResult<()> {
1431        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1432        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1433        self.maybe_auto_commit()?;
1434        Ok(())
1435    }
1436
1437    /// Deletes a value from the batch.
1438    ///
1439    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1440    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1441        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1442        self.maybe_auto_commit()?;
1443        Ok(())
1444    }
1445
1446    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1447    ///
1448    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1449    /// Returns immediately if auto-commit is disabled or threshold not reached.
1450    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1451        if let Some(threshold) = self.auto_commit_threshold &&
1452            self.inner.size_in_bytes() >= threshold
1453        {
1454            tracing::debug!(
1455                target: "providers::rocksdb",
1456                batch_size = self.inner.size_in_bytes(),
1457                threshold,
1458                "Auto-committing RocksDB batch"
1459            );
1460            let old_batch = std::mem::take(&mut self.inner);
1461            self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1462                |e| {
1463                    ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1464                        message: e.to_string().into(),
1465                        code: -1,
1466                    }))
1467                },
1468            )?;
1469        }
1470        Ok(())
1471    }
1472
1473    /// Commits the batch to the database.
1474    ///
1475    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1476    ///
1477    /// # Panics
1478    /// Panics if the provider is in read-only mode.
1479    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1480    pub fn commit(self) -> ProviderResult<()> {
1481        self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1482            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1483                message: e.to_string().into(),
1484                code: -1,
1485            }))
1486        })
1487    }
1488
1489    /// Returns the number of write operations (puts + deletes) queued in this batch.
1490    pub fn len(&self) -> usize {
1491        self.inner.len()
1492    }
1493
1494    /// Returns `true` if the batch contains no operations.
1495    pub fn is_empty(&self) -> bool {
1496        self.inner.is_empty()
1497    }
1498
1499    /// Returns the size of the batch in bytes.
1500    pub fn size_in_bytes(&self) -> usize {
1501        self.inner.size_in_bytes()
1502    }
1503
1504    /// Returns a reference to the underlying `RocksDB` provider.
1505    pub const fn provider(&self) -> &RocksDBProvider {
1506        self.provider
1507    }
1508
1509    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1510    ///
1511    /// This is used to defer commits to the provider level.
1512    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1513        self.inner
1514    }
1515
1516    /// Gets a value from the database.
1517    ///
1518    /// **Important constraint:** This reads only committed state, not pending writes in this
1519    /// batch or other pending batches in `pending_rocksdb_batches`.
1520    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1521        self.provider.get::<T>(key)
1522    }
1523
1524    /// Appends indices to an account history shard with proper shard management.
1525    ///
1526    /// Loads the existing shard (if any), appends new indices, and rechunks into
1527    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1528    ///
1529    /// # Requirements
1530    ///
1531    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1532    /// - This method MUST only be called once per address per batch. The batch reads existing
1533    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1534    ///   address will cause the second call to overwrite the first.
1535    pub fn append_account_history_shard(
1536        &mut self,
1537        address: Address,
1538        indices: impl IntoIterator<Item = u64>,
1539    ) -> ProviderResult<()> {
1540        let indices: Vec<u64> = indices.into_iter().collect();
1541
1542        if indices.is_empty() {
1543            return Ok(());
1544        }
1545
1546        debug_assert!(
1547            indices.windows(2).all(|w| w[0] < w[1]),
1548            "indices must be strictly increasing: {:?}",
1549            indices
1550        );
1551
1552        let last_key = ShardedKey::new(address, u64::MAX);
1553        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1554        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1555
1556        last_shard.append(indices).map_err(ProviderError::other)?;
1557
1558        // Fast path: all indices fit in one shard
1559        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1560            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1561            return Ok(());
1562        }
1563
1564        // Slow path: rechunk into multiple shards
1565        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1566        let mut chunks_peekable = chunks.into_iter().peekable();
1567
1568        while let Some(chunk) = chunks_peekable.next() {
1569            let shard = BlockNumberList::new_pre_sorted(chunk);
1570            let highest_block_number = if chunks_peekable.peek().is_some() {
1571                shard.iter().next_back().expect("`chunks` does not return empty list")
1572            } else {
1573                u64::MAX
1574            };
1575
1576            self.put::<tables::AccountsHistory>(
1577                ShardedKey::new(address, highest_block_number),
1578                &shard,
1579            )?;
1580        }
1581
1582        Ok(())
1583    }
1584
1585    /// Appends indices to a storage history shard with proper shard management.
1586    ///
1587    /// Loads the existing shard (if any), appends new indices, and rechunks into
1588    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1589    ///
1590    /// # Requirements
1591    ///
1592    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1593    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1594    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1595    ///   twice for the same key will cause the second call to overwrite the first.
1596    pub fn append_storage_history_shard(
1597        &mut self,
1598        address: Address,
1599        storage_key: B256,
1600        indices: impl IntoIterator<Item = u64>,
1601    ) -> ProviderResult<()> {
1602        let indices: Vec<u64> = indices.into_iter().collect();
1603
1604        if indices.is_empty() {
1605            return Ok(());
1606        }
1607
1608        debug_assert!(
1609            indices.windows(2).all(|w| w[0] < w[1]),
1610            "indices must be strictly increasing: {:?}",
1611            indices
1612        );
1613
1614        let last_key = StorageShardedKey::last(address, storage_key);
1615        let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1616        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1617
1618        last_shard.append(indices).map_err(ProviderError::other)?;
1619
1620        // Fast path: all indices fit in one shard
1621        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1622            self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1623            return Ok(());
1624        }
1625
1626        // Slow path: rechunk into multiple shards
1627        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1628        let mut chunks_peekable = chunks.into_iter().peekable();
1629
1630        while let Some(chunk) = chunks_peekable.next() {
1631            let shard = BlockNumberList::new_pre_sorted(chunk);
1632            let highest_block_number = if chunks_peekable.peek().is_some() {
1633                shard.iter().next_back().expect("`chunks` does not return empty list")
1634            } else {
1635                u64::MAX
1636            };
1637
1638            self.put::<tables::StoragesHistory>(
1639                StorageShardedKey::new(address, storage_key, highest_block_number),
1640                &shard,
1641            )?;
1642        }
1643
1644        Ok(())
1645    }
1646
1647    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1648    ///
1649    /// Mirrors MDBX `unwind_history_shards` behavior:
1650    /// - Deletes shards entirely above `keep_to`
1651    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1652    /// - Preserves shards entirely below `keep_to`
1653    pub fn unwind_account_history_to(
1654        &mut self,
1655        address: Address,
1656        keep_to: BlockNumber,
1657    ) -> ProviderResult<()> {
1658        let shards = self.provider.account_history_shards(address)?;
1659        if shards.is_empty() {
1660            return Ok(());
1661        }
1662
1663        // Find the first shard that might contain blocks > keep_to.
1664        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1665        let boundary_idx = shards.iter().position(|(key, _)| {
1666            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1667        });
1668
1669        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1670        let Some(boundary_idx) = boundary_idx else {
1671            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1672            if last_key.highest_block_number != u64::MAX {
1673                self.delete::<tables::AccountsHistory>(last_key.clone())?;
1674                self.put::<tables::AccountsHistory>(
1675                    ShardedKey::new(address, u64::MAX),
1676                    last_value,
1677                )?;
1678            }
1679            return Ok(());
1680        };
1681
1682        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1683        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1684            self.delete::<tables::AccountsHistory>(key.clone())?;
1685        }
1686
1687        // Process the boundary shard: filter out blocks > keep_to
1688        let (boundary_key, boundary_list) = &shards[boundary_idx];
1689
1690        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1691        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1692
1693        // Build truncated list once; check emptiness directly (avoids double iteration)
1694        let new_last =
1695            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1696
1697        if new_last.is_empty() {
1698            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
1699            // u64::MAX.
1700            if boundary_idx == 0 {
1701                // Nothing left for this address
1702                return Ok(());
1703            }
1704
1705            let (prev_key, prev_value) = &shards[boundary_idx - 1];
1706            if prev_key.highest_block_number != u64::MAX {
1707                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1708                self.put::<tables::AccountsHistory>(
1709                    ShardedKey::new(address, u64::MAX),
1710                    prev_value,
1711                )?;
1712            }
1713            return Ok(());
1714        }
1715
1716        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1717
1718        Ok(())
1719    }
1720
1721    /// Prunes history shards, removing blocks <= `to_block`.
1722    ///
1723    /// Generic implementation for both account and storage history pruning.
1724    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1725    /// (if any) will have the sentinel key (`u64::MAX`).
1726    #[allow(clippy::too_many_arguments)]
1727    fn prune_history_shards_inner<K>(
1728        &mut self,
1729        shards: Vec<(K, BlockNumberList)>,
1730        to_block: BlockNumber,
1731        get_highest: impl Fn(&K) -> u64,
1732        is_sentinel: impl Fn(&K) -> bool,
1733        delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
1734        put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
1735        create_sentinel: impl Fn() -> K,
1736    ) -> ProviderResult<PruneShardOutcome>
1737    where
1738        K: Clone,
1739    {
1740        if shards.is_empty() {
1741            return Ok(PruneShardOutcome::Unchanged);
1742        }
1743
1744        let mut deleted = false;
1745        let mut updated = false;
1746        let mut last_remaining: Option<(K, BlockNumberList)> = None;
1747
1748        for (key, block_list) in shards {
1749            if !is_sentinel(&key) && get_highest(&key) <= to_block {
1750                delete_shard(self, key)?;
1751                deleted = true;
1752            } else {
1753                let original_len = block_list.len();
1754                let filtered =
1755                    BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
1756
1757                if filtered.is_empty() {
1758                    delete_shard(self, key)?;
1759                    deleted = true;
1760                } else if filtered.len() < original_len {
1761                    put_shard(self, key.clone(), &filtered)?;
1762                    last_remaining = Some((key, filtered));
1763                    updated = true;
1764                } else {
1765                    last_remaining = Some((key, block_list));
1766                }
1767            }
1768        }
1769
1770        if let Some((last_key, last_value)) = last_remaining &&
1771            !is_sentinel(&last_key)
1772        {
1773            delete_shard(self, last_key)?;
1774            put_shard(self, create_sentinel(), &last_value)?;
1775            updated = true;
1776        }
1777
1778        if deleted {
1779            Ok(PruneShardOutcome::Deleted)
1780        } else if updated {
1781            Ok(PruneShardOutcome::Updated)
1782        } else {
1783            Ok(PruneShardOutcome::Unchanged)
1784        }
1785    }
1786
1787    /// Prunes account history for the given address, removing blocks <= `to_block`.
1788    ///
1789    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1790    /// (if any) will have the sentinel key (`u64::MAX`).
1791    pub fn prune_account_history_to(
1792        &mut self,
1793        address: Address,
1794        to_block: BlockNumber,
1795    ) -> ProviderResult<PruneShardOutcome> {
1796        let shards = self.provider.account_history_shards(address)?;
1797        self.prune_history_shards_inner(
1798            shards,
1799            to_block,
1800            |key| key.highest_block_number,
1801            |key| key.highest_block_number == u64::MAX,
1802            |batch, key| batch.delete::<tables::AccountsHistory>(key),
1803            |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1804            || ShardedKey::new(address, u64::MAX),
1805        )
1806    }
1807
1808    /// Prunes account history for multiple addresses in a single iterator pass.
1809    ///
1810    /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
1811    /// because it reuses a single raw iterator and skips seeks when the iterator is already
1812    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
1813    ///
1814    /// `targets` MUST be sorted by address for correctness and optimal performance
1815    /// (matches on-disk key order).
1816    pub fn prune_account_history_batch(
1817        &mut self,
1818        targets: &[(Address, BlockNumber)],
1819    ) -> ProviderResult<PrunedIndices> {
1820        if targets.is_empty() {
1821            return Ok(PrunedIndices::default());
1822        }
1823
1824        debug_assert!(
1825            targets.windows(2).all(|w| w[0].0 <= w[1].0),
1826            "prune_account_history_batch: targets must be sorted by address"
1827        );
1828
1829        // ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
1830        // The first 20 bytes are the "prefix" that identifies the address
1831        const PREFIX_LEN: usize = 20;
1832
1833        let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
1834        let mut iter = self.provider.0.raw_iterator_cf(cf);
1835        let mut outcomes = PrunedIndices::default();
1836
1837        for (address, to_block) in targets {
1838            // Build the target prefix (first 20 bytes = address)
1839            let start_key = ShardedKey::new(*address, 0u64).encode();
1840            let target_prefix = &start_key[..PREFIX_LEN];
1841
1842            // Check if we need to seek or if the iterator is already positioned correctly.
1843            // After processing the previous target, the iterator is either:
1844            // 1. Positioned at a key with a different prefix (we iterated past our shards)
1845            // 2. Invalid (no more keys)
1846            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
1847            let needs_seek = if iter.valid() {
1848                if let Some(current_key) = iter.key() {
1849                    // If current key's prefix < target prefix, we need to seek forward
1850                    // If current key's prefix > target prefix, this target has no shards (skip)
1851                    // If current key's prefix == target prefix, we're already positioned
1852                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1853                } else {
1854                    true
1855                }
1856            } else {
1857                true
1858            };
1859
1860            if needs_seek {
1861                iter.seek(start_key);
1862                iter.status().map_err(|e| {
1863                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1864                        message: e.to_string().into(),
1865                        code: -1,
1866                    }))
1867                })?;
1868            }
1869
1870            // Collect all shards for this address using raw prefix comparison
1871            let mut shards = Vec::new();
1872            while iter.valid() {
1873                let Some(key_bytes) = iter.key() else { break };
1874
1875                // Use raw prefix comparison instead of full decode for the prefix check
1876                let current_prefix = key_bytes.get(..PREFIX_LEN);
1877                if current_prefix != Some(target_prefix) {
1878                    break;
1879                }
1880
1881                // Now decode the full key (we need the block number)
1882                let key = ShardedKey::<Address>::decode(key_bytes)
1883                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1884
1885                let Some(value_bytes) = iter.value() else { break };
1886                let value = BlockNumberList::decompress(value_bytes)
1887                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1888
1889                shards.push((key, value));
1890                iter.next();
1891            }
1892
1893            match self.prune_history_shards_inner(
1894                shards,
1895                *to_block,
1896                |key| key.highest_block_number,
1897                |key| key.highest_block_number == u64::MAX,
1898                |batch, key| batch.delete::<tables::AccountsHistory>(key),
1899                |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
1900                || ShardedKey::new(*address, u64::MAX),
1901            )? {
1902                PruneShardOutcome::Deleted => outcomes.deleted += 1,
1903                PruneShardOutcome::Updated => outcomes.updated += 1,
1904                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
1905            }
1906        }
1907
1908        Ok(outcomes)
1909    }
1910
1911    /// Prunes storage history for the given address and storage key, removing blocks <=
1912    /// `to_block`.
1913    ///
1914    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1915    /// (if any) will have the sentinel key (`u64::MAX`).
1916    pub fn prune_storage_history_to(
1917        &mut self,
1918        address: Address,
1919        storage_key: B256,
1920        to_block: BlockNumber,
1921    ) -> ProviderResult<PruneShardOutcome> {
1922        let shards = self.provider.storage_history_shards(address, storage_key)?;
1923        self.prune_history_shards_inner(
1924            shards,
1925            to_block,
1926            |key| key.sharded_key.highest_block_number,
1927            |key| key.sharded_key.highest_block_number == u64::MAX,
1928            |batch, key| batch.delete::<tables::StoragesHistory>(key),
1929            |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
1930            || StorageShardedKey::last(address, storage_key),
1931        )
1932    }
1933
1934    /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
1935    /// pass.
1936    ///
1937    /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
1938    /// because it reuses a single raw iterator and skips seeks when the iterator is already
1939    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
1940    ///
1941    /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
1942    /// performance (matches on-disk key order).
1943    pub fn prune_storage_history_batch(
1944        &mut self,
1945        targets: &[((Address, B256), BlockNumber)],
1946    ) -> ProviderResult<PrunedIndices> {
1947        if targets.is_empty() {
1948            return Ok(PrunedIndices::default());
1949        }
1950
1951        debug_assert!(
1952            targets.windows(2).all(|w| w[0].0 <= w[1].0),
1953            "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
1954        );
1955
1956        // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
1957        // The first 52 bytes are the "prefix" that identifies (address, storage_key)
1958        const PREFIX_LEN: usize = 52;
1959
1960        let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
1961        let mut iter = self.provider.0.raw_iterator_cf(cf);
1962        let mut outcomes = PrunedIndices::default();
1963
1964        for ((address, storage_key), to_block) in targets {
1965            // Build the target prefix (first 52 bytes of encoded key)
1966            let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
1967            let target_prefix = &start_key[..PREFIX_LEN];
1968
1969            // Check if we need to seek or if the iterator is already positioned correctly.
1970            // After processing the previous target, the iterator is either:
1971            // 1. Positioned at a key with a different prefix (we iterated past our shards)
1972            // 2. Invalid (no more keys)
1973            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
1974            let needs_seek = if iter.valid() {
1975                if let Some(current_key) = iter.key() {
1976                    // If current key's prefix < target prefix, we need to seek forward
1977                    // If current key's prefix > target prefix, this target has no shards (skip)
1978                    // If current key's prefix == target prefix, we're already positioned
1979                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
1980                } else {
1981                    true
1982                }
1983            } else {
1984                true
1985            };
1986
1987            if needs_seek {
1988                iter.seek(start_key);
1989                iter.status().map_err(|e| {
1990                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1991                        message: e.to_string().into(),
1992                        code: -1,
1993                    }))
1994                })?;
1995            }
1996
1997            // Collect all shards for this (address, storage_key) pair using prefix comparison
1998            let mut shards = Vec::new();
1999            while iter.valid() {
2000                let Some(key_bytes) = iter.key() else { break };
2001
2002                // Use raw prefix comparison instead of full decode for the prefix check
2003                let current_prefix = key_bytes.get(..PREFIX_LEN);
2004                if current_prefix != Some(target_prefix) {
2005                    break;
2006                }
2007
2008                // Now decode the full key (we need the block number)
2009                let key = StorageShardedKey::decode(key_bytes)
2010                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2011
2012                let Some(value_bytes) = iter.value() else { break };
2013                let value = BlockNumberList::decompress(value_bytes)
2014                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2015
2016                shards.push((key, value));
2017                iter.next();
2018            }
2019
2020            // Use existing prune_history_shards_inner logic
2021            match self.prune_history_shards_inner(
2022                shards,
2023                *to_block,
2024                |key| key.sharded_key.highest_block_number,
2025                |key| key.sharded_key.highest_block_number == u64::MAX,
2026                |batch, key| batch.delete::<tables::StoragesHistory>(key),
2027                |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2028                || StorageShardedKey::last(*address, *storage_key),
2029            )? {
2030                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2031                PruneShardOutcome::Updated => outcomes.updated += 1,
2032                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2033            }
2034        }
2035
2036        Ok(outcomes)
2037    }
2038
2039    /// Unwinds storage history to keep only blocks `<= keep_to`.
2040    ///
2041    /// Handles multi-shard scenarios by:
2042    /// 1. Loading all shards for the `(address, storage_key)` pair
2043    /// 2. Finding the boundary shard containing `keep_to`
2044    /// 3. Deleting all shards after the boundary
2045    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
2046    /// 5. Ensuring the last shard is keyed with `u64::MAX`
2047    pub fn unwind_storage_history_to(
2048        &mut self,
2049        address: Address,
2050        storage_key: B256,
2051        keep_to: BlockNumber,
2052    ) -> ProviderResult<()> {
2053        let shards = self.provider.storage_history_shards(address, storage_key)?;
2054        if shards.is_empty() {
2055            return Ok(());
2056        }
2057
2058        // Find the first shard that might contain blocks > keep_to.
2059        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
2060        let boundary_idx = shards.iter().position(|(key, _)| {
2061            key.sharded_key.highest_block_number == u64::MAX ||
2062                key.sharded_key.highest_block_number > keep_to
2063        });
2064
2065        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2066        let Some(boundary_idx) = boundary_idx else {
2067            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2068            if last_key.sharded_key.highest_block_number != u64::MAX {
2069                self.delete::<tables::StoragesHistory>(last_key.clone())?;
2070                self.put::<tables::StoragesHistory>(
2071                    StorageShardedKey::last(address, storage_key),
2072                    last_value,
2073                )?;
2074            }
2075            return Ok(());
2076        };
2077
2078        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2079        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2080            self.delete::<tables::StoragesHistory>(key.clone())?;
2081        }
2082
2083        // Process the boundary shard: filter out blocks > keep_to
2084        let (boundary_key, boundary_list) = &shards[boundary_idx];
2085
2086        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2087        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2088
2089        // Build truncated list once; check emptiness directly (avoids double iteration)
2090        let new_last =
2091            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2092
2093        if new_last.is_empty() {
2094            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2095            // u64::MAX.
2096            if boundary_idx == 0 {
2097                // Nothing left for this (address, storage_key) pair
2098                return Ok(());
2099            }
2100
2101            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2102            if prev_key.sharded_key.highest_block_number != u64::MAX {
2103                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2104                self.put::<tables::StoragesHistory>(
2105                    StorageShardedKey::last(address, storage_key),
2106                    prev_value,
2107                )?;
2108            }
2109            return Ok(());
2110        }
2111
2112        self.put::<tables::StoragesHistory>(
2113            StorageShardedKey::last(address, storage_key),
2114            &new_last,
2115        )?;
2116
2117        Ok(())
2118    }
2119
2120    /// Clears all account history shards for the given address.
2121    ///
2122    /// Used when unwinding from block 0 (i.e., removing all history).
2123    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2124        let shards = self.provider.account_history_shards(address)?;
2125        for (key, _) in shards {
2126            self.delete::<tables::AccountsHistory>(key)?;
2127        }
2128        Ok(())
2129    }
2130
2131    /// Clears all storage history shards for the given `(address, storage_key)` pair.
2132    ///
2133    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
2134    pub fn clear_storage_history(
2135        &mut self,
2136        address: Address,
2137        storage_key: B256,
2138    ) -> ProviderResult<()> {
2139        let shards = self.provider.storage_history_shards(address, storage_key)?;
2140        for (key, _) in shards {
2141            self.delete::<tables::StoragesHistory>(key)?;
2142        }
2143        Ok(())
2144    }
2145}
2146
2147/// `RocksDB` transaction wrapper providing MDBX-like semantics.
2148///
2149/// Supports:
2150/// - Read-your-writes: reads see uncommitted writes within the same transaction
2151/// - Atomic commit/rollback
2152/// - Iteration over uncommitted data
2153///
2154/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
2155/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
2156pub struct RocksTx<'db> {
2157    inner: Transaction<'db, OptimisticTransactionDB>,
2158    provider: &'db RocksDBProvider,
2159}
2160
2161impl fmt::Debug for RocksTx<'_> {
2162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2163        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2164    }
2165}
2166
2167impl<'db> RocksTx<'db> {
2168    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
2169    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2170        let encoded_key = key.encode();
2171        self.get_encoded::<T>(&encoded_key)
2172    }
2173
2174    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
2175    pub fn get_encoded<T: Table>(
2176        &self,
2177        key: &<T::Key as Encode>::Encoded,
2178    ) -> ProviderResult<Option<T::Value>> {
2179        let cf = self.provider.get_cf_handle::<T>()?;
2180        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2181            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2182                message: e.to_string().into(),
2183                code: -1,
2184            }))
2185        })?;
2186
2187        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2188    }
2189
2190    /// Puts a value into the specified table.
2191    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2192        let encoded_key = key.encode();
2193        self.put_encoded::<T>(&encoded_key, value)
2194    }
2195
2196    /// Puts a value using pre-encoded key.
2197    pub fn put_encoded<T: Table>(
2198        &self,
2199        key: &<T::Key as Encode>::Encoded,
2200        value: &T::Value,
2201    ) -> ProviderResult<()> {
2202        let cf = self.provider.get_cf_handle::<T>()?;
2203        let mut buf = Vec::new();
2204        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2205
2206        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2207            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2208                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2209                operation: DatabaseWriteOperation::PutUpsert,
2210                table_name: T::NAME,
2211                key: key.as_ref().to_vec(),
2212            })))
2213        })
2214    }
2215
2216    /// Deletes a value from the specified table.
2217    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2218        let cf = self.provider.get_cf_handle::<T>()?;
2219        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2220            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2221                message: e.to_string().into(),
2222                code: -1,
2223            }))
2224        })
2225    }
2226
2227    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
2228    ///
2229    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
2230    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2231        let cf = self.provider.get_cf_handle::<T>()?;
2232        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2233        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2234    }
2235
2236    /// Creates an iterator starting from the given key (inclusive).
2237    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2238        let cf = self.provider.get_cf_handle::<T>()?;
2239        let encoded_key = key.encode();
2240        let iter = self
2241            .inner
2242            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2243        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2244    }
2245
2246    /// Commits the transaction, persisting all changes.
2247    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2248    pub fn commit(self) -> ProviderResult<()> {
2249        self.inner.commit().map_err(|e| {
2250            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2251                message: e.to_string().into(),
2252                code: -1,
2253            }))
2254        })
2255    }
2256
2257    /// Rolls back the transaction, discarding all changes.
2258    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2259    pub fn rollback(self) -> ProviderResult<()> {
2260        self.inner.rollback().map_err(|e| {
2261            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2262        })
2263    }
2264
2265    /// Lookup account history and return [`HistoryInfo`] directly.
2266    ///
2267    /// This is a thin wrapper around `history_info` that:
2268    /// - Builds the `ShardedKey` for the address + target block.
2269    /// - Validates that the found shard belongs to the same address.
2270    pub fn account_history_info(
2271        &self,
2272        address: Address,
2273        block_number: BlockNumber,
2274        lowest_available_block_number: Option<BlockNumber>,
2275    ) -> ProviderResult<HistoryInfo> {
2276        let key = ShardedKey::new(address, block_number);
2277        self.history_info::<tables::AccountsHistory>(
2278            key.encode().as_ref(),
2279            block_number,
2280            lowest_available_block_number,
2281            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
2282            |prev_bytes| {
2283                <ShardedKey<Address> as Decode>::decode(prev_bytes)
2284                    .map(|k| k.key == address)
2285                    .unwrap_or(false)
2286            },
2287        )
2288    }
2289
2290    /// Lookup storage history and return [`HistoryInfo`] directly.
2291    ///
2292    /// This is a thin wrapper around `history_info` that:
2293    /// - Builds the `StorageShardedKey` for address + storage key + target block.
2294    /// - Validates that the found shard belongs to the same address and storage slot.
2295    pub fn storage_history_info(
2296        &self,
2297        address: Address,
2298        storage_key: B256,
2299        block_number: BlockNumber,
2300        lowest_available_block_number: Option<BlockNumber>,
2301    ) -> ProviderResult<HistoryInfo> {
2302        let key = StorageShardedKey::new(address, storage_key, block_number);
2303        self.history_info::<tables::StoragesHistory>(
2304            key.encode().as_ref(),
2305            block_number,
2306            lowest_available_block_number,
2307            |key_bytes| {
2308                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
2309                Ok(k.address == address && k.sharded_key.key == storage_key)
2310            },
2311            |prev_bytes| {
2312                <StorageShardedKey as Decode>::decode(prev_bytes)
2313                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
2314                    .unwrap_or(false)
2315            },
2316        )
2317    }
2318
2319    /// Generic history lookup for sharded history tables.
2320    ///
2321    /// Seeks to the shard containing `block_number`, checks if the key matches via `key_matches`,
2322    /// and uses `prev_key_matches` to detect if a previous shard exists for the same key.
2323    fn history_info<T>(
2324        &self,
2325        encoded_key: &[u8],
2326        block_number: BlockNumber,
2327        lowest_available_block_number: Option<BlockNumber>,
2328        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
2329        prev_key_matches: impl Fn(&[u8]) -> bool,
2330    ) -> ProviderResult<HistoryInfo>
2331    where
2332        T: Table<Value = BlockNumberList>,
2333    {
2334        // History may be pruned if a lowest available block is set.
2335        let is_maybe_pruned = lowest_available_block_number.is_some();
2336        let fallback = || {
2337            Ok(if is_maybe_pruned {
2338                HistoryInfo::MaybeInPlainState
2339            } else {
2340                HistoryInfo::NotYetWritten
2341            })
2342        };
2343
2344        let cf = self.provider.0.cf_handle_rw(T::NAME)?;
2345
2346        // Create a raw iterator to access key bytes directly.
2347        let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
2348            self.inner.raw_iterator_cf(&cf);
2349
2350        // Seek to the smallest key >= encoded_key.
2351        iter.seek(encoded_key);
2352        Self::raw_iter_status_ok(&iter)?;
2353
2354        if !iter.valid() {
2355            // No shard found at or after target block.
2356            //
2357            // (MaybeInPlainState) The key may have been written, but due to pruning we may not have
2358            // changesets and history, so we need to make a plain state lookup.
2359            // (HistoryInfo::NotYetWritten) The key has not been written to at all.
2360            return fallback();
2361        }
2362
2363        // Check if the found key matches our target entity.
2364        let Some(key_bytes) = iter.key() else {
2365            return fallback();
2366        };
2367        if !key_matches(key_bytes)? {
2368            // The found key is for a different entity.
2369            return fallback();
2370        }
2371
2372        // Decompress the block list for this shard.
2373        let Some(value_bytes) = iter.value() else {
2374            return fallback();
2375        };
2376        let chunk = BlockNumberList::decompress(value_bytes)?;
2377        let (rank, found_block) = compute_history_rank(&chunk, block_number);
2378
2379        // Lazy check for previous shard - only called when needed.
2380        // If we can step to a previous shard for this same key, history already exists,
2381        // so the target block is not before the first write.
2382        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
2383            iter.prev();
2384            Self::raw_iter_status_ok(&iter)?;
2385            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
2386            !has_prev
2387        } else {
2388            false
2389        };
2390
2391        Ok(HistoryInfo::from_lookup(
2392            found_block,
2393            is_before_first_write,
2394            lowest_available_block_number,
2395        ))
2396    }
2397
2398    /// Returns an error if the raw iterator is in an invalid state due to an I/O error.
2399    fn raw_iter_status_ok(
2400        iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
2401    ) -> ProviderResult<()> {
2402        iter.status().map_err(|e| {
2403            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2404                message: e.to_string().into(),
2405                code: -1,
2406            }))
2407        })
2408    }
2409}
2410
2411/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
2412enum RocksDBIterEnum<'db> {
2413    /// Iterator from read-write `OptimisticTransactionDB`.
2414    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2415    /// Iterator from read-only `DB`.
2416    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2417}
2418
2419impl Iterator for RocksDBIterEnum<'_> {
2420    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2421
2422    fn next(&mut self) -> Option<Self::Item> {
2423        match self {
2424            Self::ReadWrite(iter) => iter.next(),
2425            Self::ReadOnly(iter) => iter.next(),
2426        }
2427    }
2428}
2429
2430/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
2431///
2432/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
2433/// without reinitializing the iterator.
2434enum RocksDBRawIterEnum<'db> {
2435    /// Raw iterator from read-write `OptimisticTransactionDB`.
2436    ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2437    /// Raw iterator from read-only `DB`.
2438    ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2439}
2440
2441impl RocksDBRawIterEnum<'_> {
2442    /// Positions the iterator at the first key >= `key`.
2443    fn seek(&mut self, key: impl AsRef<[u8]>) {
2444        match self {
2445            Self::ReadWrite(iter) => iter.seek(key),
2446            Self::ReadOnly(iter) => iter.seek(key),
2447        }
2448    }
2449
2450    /// Returns true if the iterator is positioned at a valid key-value pair.
2451    fn valid(&self) -> bool {
2452        match self {
2453            Self::ReadWrite(iter) => iter.valid(),
2454            Self::ReadOnly(iter) => iter.valid(),
2455        }
2456    }
2457
2458    /// Returns the current key, if valid.
2459    fn key(&self) -> Option<&[u8]> {
2460        match self {
2461            Self::ReadWrite(iter) => iter.key(),
2462            Self::ReadOnly(iter) => iter.key(),
2463        }
2464    }
2465
2466    /// Returns the current value, if valid.
2467    fn value(&self) -> Option<&[u8]> {
2468        match self {
2469            Self::ReadWrite(iter) => iter.value(),
2470            Self::ReadOnly(iter) => iter.value(),
2471        }
2472    }
2473
2474    /// Advances the iterator to the next key.
2475    fn next(&mut self) {
2476        match self {
2477            Self::ReadWrite(iter) => iter.next(),
2478            Self::ReadOnly(iter) => iter.next(),
2479        }
2480    }
2481
2482    /// Returns the status of the iterator.
2483    fn status(&self) -> Result<(), rocksdb::Error> {
2484        match self {
2485            Self::ReadWrite(iter) => iter.status(),
2486            Self::ReadOnly(iter) => iter.status(),
2487        }
2488    }
2489}
2490
2491/// Iterator over a `RocksDB` table (non-transactional).
2492///
2493/// Yields decoded `(Key, Value)` pairs in key order.
2494pub struct RocksDBIter<'db, T: Table> {
2495    inner: RocksDBIterEnum<'db>,
2496    _marker: std::marker::PhantomData<T>,
2497}
2498
2499impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2500    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2501        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2502    }
2503}
2504
2505impl<T: Table> Iterator for RocksDBIter<'_, T> {
2506    type Item = ProviderResult<(T::Key, T::Value)>;
2507
2508    fn next(&mut self) -> Option<Self::Item> {
2509        Some(decode_iter_item::<T>(self.inner.next()?))
2510    }
2511}
2512
2513/// Raw iterator over a `RocksDB` table (non-transactional).
2514///
2515/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2516pub struct RocksDBRawIter<'db> {
2517    inner: RocksDBIterEnum<'db>,
2518}
2519
2520impl fmt::Debug for RocksDBRawIter<'_> {
2521    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2522        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2523    }
2524}
2525
2526impl Iterator for RocksDBRawIter<'_> {
2527    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2528
2529    fn next(&mut self) -> Option<Self::Item> {
2530        match self.inner.next()? {
2531            Ok(kv) => Some(Ok(kv)),
2532            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2533                message: e.to_string().into(),
2534                code: -1,
2535            })))),
2536        }
2537    }
2538}
2539
2540/// Iterator over a `RocksDB` table within a transaction.
2541///
2542/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2543pub struct RocksTxIter<'tx, T: Table> {
2544    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2545    _marker: std::marker::PhantomData<T>,
2546}
2547
2548impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2550        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2551    }
2552}
2553
2554impl<T: Table> Iterator for RocksTxIter<'_, T> {
2555    type Item = ProviderResult<(T::Key, T::Value)>;
2556
2557    fn next(&mut self) -> Option<Self::Item> {
2558        Some(decode_iter_item::<T>(self.inner.next()?))
2559    }
2560}
2561
2562/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
2563///
2564/// Handles both error propagation from the underlying iterator and
2565/// decoding/decompression of the key and value bytes.
2566fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2567    let (key_bytes, value_bytes) = result.map_err(|e| {
2568        ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2569            message: e.to_string().into(),
2570            code: -1,
2571        }))
2572    })?;
2573
2574    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2575        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2576
2577    let value = T::Value::decompress(&value_bytes)
2578        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2579
2580    Ok((key, value))
2581}
2582
2583/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2584const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2585    match level {
2586        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2587        LogLevel::Error => rocksdb::LogLevel::Error,
2588        LogLevel::Warn => rocksdb::LogLevel::Warn,
2589        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2590        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2591    }
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596    use super::*;
2597    use crate::providers::HistoryInfo;
2598    use alloy_primitives::{Address, TxHash, B256};
2599    use reth_db_api::{
2600        models::{
2601            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2602            storage_sharded_key::StorageShardedKey,
2603            IntegerList,
2604        },
2605        table::Table,
2606        tables,
2607    };
2608    use tempfile::TempDir;
2609
2610    #[test]
2611    fn test_with_default_tables_registers_required_column_families() {
2612        let temp_dir = TempDir::new().unwrap();
2613
2614        // Build with default tables
2615        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2616
2617        // Should be able to write/read TransactionHashNumbers
2618        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2619        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2620        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2621
2622        // Should be able to write/read AccountsHistory
2623        let key = ShardedKey::new(Address::ZERO, 100);
2624        let value = IntegerList::default();
2625        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2626        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2627
2628        // Should be able to write/read StoragesHistory
2629        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2630        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2631        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2632    }
2633
2634    #[derive(Debug)]
2635    struct TestTable;
2636
2637    impl Table for TestTable {
2638        const NAME: &'static str = "TestTable";
2639        const DUPSORT: bool = false;
2640        type Key = u64;
2641        type Value = Vec<u8>;
2642    }
2643
2644    #[test]
2645    fn test_basic_operations() {
2646        let temp_dir = TempDir::new().unwrap();
2647
2648        let provider = RocksDBBuilder::new(temp_dir.path())
2649            .with_table::<TestTable>() // Type-safe!
2650            .build()
2651            .unwrap();
2652
2653        let key = 42u64;
2654        let value = b"test_value".to_vec();
2655
2656        // Test write
2657        provider.put::<TestTable>(key, &value).unwrap();
2658
2659        // Test read
2660        let result = provider.get::<TestTable>(key).unwrap();
2661        assert_eq!(result, Some(value));
2662
2663        // Test delete
2664        provider.delete::<TestTable>(key).unwrap();
2665
2666        // Verify deletion
2667        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2668    }
2669
2670    #[test]
2671    fn test_batch_operations() {
2672        let temp_dir = TempDir::new().unwrap();
2673        let provider =
2674            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2675
2676        // Write multiple entries in a batch
2677        provider
2678            .write_batch(|batch| {
2679                for i in 0..10u64 {
2680                    let value = format!("value_{i}").into_bytes();
2681                    batch.put::<TestTable>(i, &value)?;
2682                }
2683                Ok(())
2684            })
2685            .unwrap();
2686
2687        // Read all entries
2688        for i in 0..10u64 {
2689            let value = format!("value_{i}").into_bytes();
2690            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2691        }
2692
2693        // Delete all entries in a batch
2694        provider
2695            .write_batch(|batch| {
2696                for i in 0..10u64 {
2697                    batch.delete::<TestTable>(i)?;
2698                }
2699                Ok(())
2700            })
2701            .unwrap();
2702
2703        // Verify all deleted
2704        for i in 0..10u64 {
2705            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2706        }
2707    }
2708
2709    #[test]
2710    fn test_with_real_table() {
2711        let temp_dir = TempDir::new().unwrap();
2712        let provider = RocksDBBuilder::new(temp_dir.path())
2713            .with_table::<tables::TransactionHashNumbers>()
2714            .with_metrics()
2715            .build()
2716            .unwrap();
2717
2718        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2719
2720        // Insert and retrieve
2721        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2722        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2723
2724        // Batch insert multiple transactions
2725        provider
2726            .write_batch(|batch| {
2727                for i in 0..10u64 {
2728                    let hash = TxHash::from(B256::from([i as u8; 32]));
2729                    let value = i * 100;
2730                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2731                }
2732                Ok(())
2733            })
2734            .unwrap();
2735
2736        // Verify batch insertions
2737        for i in 0..10u64 {
2738            let hash = TxHash::from(B256::from([i as u8; 32]));
2739            assert_eq!(
2740                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2741                Some(i * 100)
2742            );
2743        }
2744    }
2745    #[test]
2746    fn test_statistics_enabled() {
2747        let temp_dir = TempDir::new().unwrap();
2748        // Just verify that building with statistics doesn't panic
2749        let provider = RocksDBBuilder::new(temp_dir.path())
2750            .with_table::<TestTable>()
2751            .with_statistics()
2752            .build()
2753            .unwrap();
2754
2755        // Do operations - data should be immediately readable with OptimisticTransactionDB
2756        for i in 0..10 {
2757            let value = vec![i as u8];
2758            provider.put::<TestTable>(i, &value).unwrap();
2759            // Verify write is visible
2760            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2761        }
2762    }
2763
2764    #[test]
2765    fn test_data_persistence() {
2766        let temp_dir = TempDir::new().unwrap();
2767        let provider =
2768            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2769
2770        // Insert data - OptimisticTransactionDB writes are immediately visible
2771        let value = vec![42u8; 1000];
2772        for i in 0..100 {
2773            provider.put::<TestTable>(i, &value).unwrap();
2774        }
2775
2776        // Verify data is readable
2777        for i in 0..100 {
2778            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2779        }
2780    }
2781
2782    #[test]
2783    fn test_transaction_read_your_writes() {
2784        let temp_dir = TempDir::new().unwrap();
2785        let provider =
2786            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2787
2788        // Create a transaction
2789        let tx = provider.tx();
2790
2791        // Write data within the transaction
2792        let key = 42u64;
2793        let value = b"test_value".to_vec();
2794        tx.put::<TestTable>(key, &value).unwrap();
2795
2796        // Read-your-writes: should see uncommitted data in same transaction
2797        let result = tx.get::<TestTable>(key).unwrap();
2798        assert_eq!(
2799            result,
2800            Some(value.clone()),
2801            "Transaction should see its own uncommitted writes"
2802        );
2803
2804        // Data should NOT be visible via provider (outside transaction)
2805        let provider_result = provider.get::<TestTable>(key).unwrap();
2806        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2807
2808        // Commit the transaction
2809        tx.commit().unwrap();
2810
2811        // Now data should be visible via provider
2812        let committed_result = provider.get::<TestTable>(key).unwrap();
2813        assert_eq!(committed_result, Some(value), "Committed data should be visible");
2814    }
2815
2816    #[test]
2817    fn test_transaction_rollback() {
2818        let temp_dir = TempDir::new().unwrap();
2819        let provider =
2820            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2821
2822        // First, put some initial data
2823        let key = 100u64;
2824        let initial_value = b"initial".to_vec();
2825        provider.put::<TestTable>(key, &initial_value).unwrap();
2826
2827        // Create a transaction and modify data
2828        let tx = provider.tx();
2829        let new_value = b"modified".to_vec();
2830        tx.put::<TestTable>(key, &new_value).unwrap();
2831
2832        // Verify modification is visible within transaction
2833        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2834
2835        // Rollback instead of commit
2836        tx.rollback().unwrap();
2837
2838        // Data should be unchanged (initial value)
2839        let result = provider.get::<TestTable>(key).unwrap();
2840        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2841    }
2842
2843    #[test]
2844    fn test_transaction_iterator() {
2845        let temp_dir = TempDir::new().unwrap();
2846        let provider =
2847            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2848
2849        // Create a transaction
2850        let tx = provider.tx();
2851
2852        // Write multiple entries
2853        for i in 0..5u64 {
2854            let value = format!("value_{i}").into_bytes();
2855            tx.put::<TestTable>(i, &value).unwrap();
2856        }
2857
2858        // Iterate - should see uncommitted writes
2859        let mut count = 0;
2860        for result in tx.iter::<TestTable>().unwrap() {
2861            let (key, value) = result.unwrap();
2862            assert_eq!(value, format!("value_{key}").into_bytes());
2863            count += 1;
2864        }
2865        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2866
2867        // Commit
2868        tx.commit().unwrap();
2869    }
2870
2871    #[test]
2872    fn test_batch_manual_commit() {
2873        let temp_dir = TempDir::new().unwrap();
2874        let provider =
2875            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2876
2877        // Create a batch via provider.batch()
2878        let mut batch = provider.batch();
2879
2880        // Add entries
2881        for i in 0..10u64 {
2882            let value = format!("batch_value_{i}").into_bytes();
2883            batch.put::<TestTable>(i, &value).unwrap();
2884        }
2885
2886        // Verify len/is_empty
2887        assert_eq!(batch.len(), 10);
2888        assert!(!batch.is_empty());
2889
2890        // Data should NOT be visible before commit
2891        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2892
2893        // Commit the batch
2894        batch.commit().unwrap();
2895
2896        // Now data should be visible
2897        for i in 0..10u64 {
2898            let value = format!("batch_value_{i}").into_bytes();
2899            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2900        }
2901    }
2902
2903    #[test]
2904    fn test_first_and_last_entry() {
2905        let temp_dir = TempDir::new().unwrap();
2906        let provider =
2907            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2908
2909        // Empty table should return None for both
2910        assert_eq!(provider.first::<TestTable>().unwrap(), None);
2911        assert_eq!(provider.last::<TestTable>().unwrap(), None);
2912
2913        // Insert some entries
2914        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2915        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2916        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2917
2918        // First should return the smallest key
2919        let first = provider.first::<TestTable>().unwrap();
2920        assert_eq!(first, Some((5, b"value_5".to_vec())));
2921
2922        // Last should return the largest key
2923        let last = provider.last::<TestTable>().unwrap();
2924        assert_eq!(last, Some((20, b"value_20".to_vec())));
2925    }
2926
2927    /// Tests the edge case where block < `lowest_available_block_number`.
2928    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
2929    /// so we keep this RocksDB-specific test to verify the low-level behavior.
2930    #[test]
2931    fn test_account_history_info_pruned_before_first_entry() {
2932        let temp_dir = TempDir::new().unwrap();
2933        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2934
2935        let address = Address::from([0x42; 20]);
2936
2937        // Create a single shard starting at block 100
2938        let chunk = IntegerList::new([100, 200, 300]).unwrap();
2939        let shard_key = ShardedKey::new(address, u64::MAX);
2940        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2941
2942        let tx = provider.tx();
2943
2944        // Query for block 50 with lowest_available_block_number = 100
2945        // This simulates a pruned state where data before block 100 is not available.
2946        // Since we're before the first write AND pruning boundary is set, we need to
2947        // check the changeset at the first write block.
2948        let result = tx.account_history_info(address, 50, Some(100)).unwrap();
2949        assert_eq!(result, HistoryInfo::InChangeset(100));
2950
2951        tx.rollback().unwrap();
2952    }
2953
2954    #[test]
2955    fn test_account_history_shard_split_at_boundary() {
2956        let temp_dir = TempDir::new().unwrap();
2957        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2958
2959        let address = Address::from([0x42; 20]);
2960        let limit = NUM_OF_INDICES_IN_SHARD;
2961
2962        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
2963        let indices: Vec<u64> = (0..=(limit as u64)).collect();
2964        let mut batch = provider.batch();
2965        batch.append_account_history_shard(address, indices).unwrap();
2966        batch.commit().unwrap();
2967
2968        // Should have 2 shards: one completed shard and one sentinel shard
2969        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
2970        let sentinel_key = ShardedKey::new(address, u64::MAX);
2971
2972        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
2973        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
2974
2975        assert!(completed_shard.is_some(), "completed shard should exist");
2976        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2977
2978        let completed_shard = completed_shard.unwrap();
2979        let sentinel_shard = sentinel_shard.unwrap();
2980
2981        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2982        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2983    }
2984
2985    #[test]
2986    fn test_account_history_multiple_shard_splits() {
2987        let temp_dir = TempDir::new().unwrap();
2988        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2989
2990        let address = Address::from([0x43; 20]);
2991        let limit = NUM_OF_INDICES_IN_SHARD;
2992
2993        // First batch: add NUM_OF_INDICES_IN_SHARD indices
2994        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
2995        let mut batch = provider.batch();
2996        batch.append_account_history_shard(address, first_batch_indices).unwrap();
2997        batch.commit().unwrap();
2998
2999        // Should have just a sentinel shard (exactly at limit, not over)
3000        let sentinel_key = ShardedKey::new(address, u64::MAX);
3001        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3002        assert!(shard.is_some());
3003        assert_eq!(shard.unwrap().len(), limit as u64);
3004
3005        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3006        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3007        let mut batch = provider.batch();
3008        batch.append_account_history_shard(address, second_batch_indices).unwrap();
3009        batch.commit().unwrap();
3010
3011        // Now we should have: 2 completed shards + 1 sentinel shard
3012        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3013        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3014
3015        assert!(
3016            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3017            "first completed shard should exist"
3018        );
3019        assert!(
3020            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3021            "second completed shard should exist"
3022        );
3023        assert!(
3024            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3025            "sentinel shard should exist"
3026        );
3027    }
3028
3029    #[test]
3030    fn test_storage_history_shard_split_at_boundary() {
3031        let temp_dir = TempDir::new().unwrap();
3032        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3033
3034        let address = Address::from([0x44; 20]);
3035        let slot = B256::from([0x55; 32]);
3036        let limit = NUM_OF_INDICES_IN_SHARD;
3037
3038        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3039        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3040        let mut batch = provider.batch();
3041        batch.append_storage_history_shard(address, slot, indices).unwrap();
3042        batch.commit().unwrap();
3043
3044        // Should have 2 shards: one completed shard and one sentinel shard
3045        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3046        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3047
3048        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3049        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3050
3051        assert!(completed_shard.is_some(), "completed shard should exist");
3052        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3053
3054        let completed_shard = completed_shard.unwrap();
3055        let sentinel_shard = sentinel_shard.unwrap();
3056
3057        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3058        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3059    }
3060
3061    #[test]
3062    fn test_storage_history_multiple_shard_splits() {
3063        let temp_dir = TempDir::new().unwrap();
3064        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3065
3066        let address = Address::from([0x46; 20]);
3067        let slot = B256::from([0x57; 32]);
3068        let limit = NUM_OF_INDICES_IN_SHARD;
3069
3070        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3071        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3072        let mut batch = provider.batch();
3073        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3074        batch.commit().unwrap();
3075
3076        // Should have just a sentinel shard (exactly at limit, not over)
3077        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3078        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3079        assert!(shard.is_some());
3080        assert_eq!(shard.unwrap().len(), limit as u64);
3081
3082        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3083        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3084        let mut batch = provider.batch();
3085        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3086        batch.commit().unwrap();
3087
3088        // Now we should have: 2 completed shards + 1 sentinel shard
3089        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3090        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3091
3092        assert!(
3093            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3094            "first completed shard should exist"
3095        );
3096        assert!(
3097            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3098            "second completed shard should exist"
3099        );
3100        assert!(
3101            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3102            "sentinel shard should exist"
3103        );
3104    }
3105
3106    #[test]
3107    fn test_clear_table() {
3108        let temp_dir = TempDir::new().unwrap();
3109        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3110
3111        let address = Address::from([0x42; 20]);
3112        let key = ShardedKey::new(address, u64::MAX);
3113        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3114
3115        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3116        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3117
3118        provider.clear::<tables::AccountsHistory>().unwrap();
3119
3120        assert!(
3121            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3122            "table should be empty after clear"
3123        );
3124        assert!(
3125            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3126            "first() should return None after clear"
3127        );
3128    }
3129
3130    #[test]
3131    fn test_clear_empty_table() {
3132        let temp_dir = TempDir::new().unwrap();
3133        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3134
3135        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3136
3137        provider.clear::<tables::AccountsHistory>().unwrap();
3138
3139        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3140    }
3141
3142    #[test]
3143    fn test_unwind_account_history_to_basic() {
3144        let temp_dir = TempDir::new().unwrap();
3145        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3146
3147        let address = Address::from([0x42; 20]);
3148
3149        // Add blocks 0-10
3150        let mut batch = provider.batch();
3151        batch.append_account_history_shard(address, 0..=10).unwrap();
3152        batch.commit().unwrap();
3153
3154        // Verify we have blocks 0-10
3155        let key = ShardedKey::new(address, u64::MAX);
3156        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3157        assert!(result.is_some());
3158        let blocks: Vec<u64> = result.unwrap().iter().collect();
3159        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3160
3161        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
3162        let mut batch = provider.batch();
3163        batch.unwind_account_history_to(address, 5).unwrap();
3164        batch.commit().unwrap();
3165
3166        // Verify only blocks 0-5 remain
3167        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3168        assert!(result.is_some());
3169        let blocks: Vec<u64> = result.unwrap().iter().collect();
3170        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3171    }
3172
3173    #[test]
3174    fn test_unwind_account_history_to_removes_all() {
3175        let temp_dir = TempDir::new().unwrap();
3176        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3177
3178        let address = Address::from([0x42; 20]);
3179
3180        // Add blocks 5-10
3181        let mut batch = provider.batch();
3182        batch.append_account_history_shard(address, 5..=10).unwrap();
3183        batch.commit().unwrap();
3184
3185        // Unwind to block 4 (removes all blocks since they're all > 4)
3186        let mut batch = provider.batch();
3187        batch.unwind_account_history_to(address, 4).unwrap();
3188        batch.commit().unwrap();
3189
3190        // Verify no data remains for this address
3191        let key = ShardedKey::new(address, u64::MAX);
3192        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3193        assert!(result.is_none(), "Should have no data after full unwind");
3194    }
3195
3196    #[test]
3197    fn test_unwind_account_history_to_no_op() {
3198        let temp_dir = TempDir::new().unwrap();
3199        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3200
3201        let address = Address::from([0x42; 20]);
3202
3203        // Add blocks 0-5
3204        let mut batch = provider.batch();
3205        batch.append_account_history_shard(address, 0..=5).unwrap();
3206        batch.commit().unwrap();
3207
3208        // Unwind to block 10 (no-op since all blocks are <= 10)
3209        let mut batch = provider.batch();
3210        batch.unwind_account_history_to(address, 10).unwrap();
3211        batch.commit().unwrap();
3212
3213        // Verify blocks 0-5 still remain
3214        let key = ShardedKey::new(address, u64::MAX);
3215        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3216        assert!(result.is_some());
3217        let blocks: Vec<u64> = result.unwrap().iter().collect();
3218        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3219    }
3220
3221    #[test]
3222    fn test_unwind_account_history_to_block_zero() {
3223        let temp_dir = TempDir::new().unwrap();
3224        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3225
3226        let address = Address::from([0x42; 20]);
3227
3228        // Add blocks 0-5 (including block 0)
3229        let mut batch = provider.batch();
3230        batch.append_account_history_shard(address, 0..=5).unwrap();
3231        batch.commit().unwrap();
3232
3233        // Unwind to block 0 (keep only block 0, remove 1-5)
3234        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
3235        let mut batch = provider.batch();
3236        batch.unwind_account_history_to(address, 0).unwrap();
3237        batch.commit().unwrap();
3238
3239        // Verify only block 0 remains
3240        let key = ShardedKey::new(address, u64::MAX);
3241        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3242        assert!(result.is_some());
3243        let blocks: Vec<u64> = result.unwrap().iter().collect();
3244        assert_eq!(blocks, vec![0]);
3245    }
3246
3247    #[test]
3248    fn test_unwind_account_history_to_multi_shard() {
3249        let temp_dir = TempDir::new().unwrap();
3250        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3251
3252        let address = Address::from([0x42; 20]);
3253
3254        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
3255        // For testing, we'll manually create shards with specific keys
3256        let mut batch = provider.batch();
3257
3258        // First shard: blocks 1-50, keyed by 50
3259        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3260        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3261
3262        // Second shard: blocks 51-100, keyed by MAX (sentinel)
3263        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3264        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3265
3266        batch.commit().unwrap();
3267
3268        // Verify we have 2 shards
3269        let shards = provider.account_history_shards(address).unwrap();
3270        assert_eq!(shards.len(), 2);
3271
3272        // Unwind to block 75 (keep 1-75, remove 76-100)
3273        let mut batch = provider.batch();
3274        batch.unwind_account_history_to(address, 75).unwrap();
3275        batch.commit().unwrap();
3276
3277        // Verify: shard1 should be untouched, shard2 should be truncated
3278        let shards = provider.account_history_shards(address).unwrap();
3279        assert_eq!(shards.len(), 2);
3280
3281        // First shard unchanged
3282        assert_eq!(shards[0].0.highest_block_number, 50);
3283        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3284
3285        // Second shard truncated and re-keyed to MAX
3286        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3287        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3288    }
3289
3290    #[test]
3291    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3292        let temp_dir = TempDir::new().unwrap();
3293        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3294
3295        let address = Address::from([0x42; 20]);
3296
3297        // Create two shards
3298        let mut batch = provider.batch();
3299
3300        // First shard: blocks 1-50, keyed by 50
3301        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3302        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3303
3304        // Second shard: blocks 75-100, keyed by MAX
3305        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3306        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3307
3308        batch.commit().unwrap();
3309
3310        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
3311        let mut batch = provider.batch();
3312        batch.unwind_account_history_to(address, 60).unwrap();
3313        batch.commit().unwrap();
3314
3315        // Verify: only shard1 remains, now keyed as MAX
3316        let shards = provider.account_history_shards(address).unwrap();
3317        assert_eq!(shards.len(), 1);
3318        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3319        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3320    }
3321
3322    #[test]
3323    fn test_account_history_shards_iterator() {
3324        let temp_dir = TempDir::new().unwrap();
3325        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3326
3327        let address = Address::from([0x42; 20]);
3328        let other_address = Address::from([0x43; 20]);
3329
3330        // Add data for two addresses
3331        let mut batch = provider.batch();
3332        batch.append_account_history_shard(address, 0..=5).unwrap();
3333        batch.append_account_history_shard(other_address, 10..=15).unwrap();
3334        batch.commit().unwrap();
3335
3336        // Query shards for first address only
3337        let shards = provider.account_history_shards(address).unwrap();
3338        assert_eq!(shards.len(), 1);
3339        assert_eq!(shards[0].0.key, address);
3340
3341        // Query shards for second address only
3342        let shards = provider.account_history_shards(other_address).unwrap();
3343        assert_eq!(shards.len(), 1);
3344        assert_eq!(shards[0].0.key, other_address);
3345
3346        // Query shards for non-existent address
3347        let non_existent = Address::from([0x99; 20]);
3348        let shards = provider.account_history_shards(non_existent).unwrap();
3349        assert!(shards.is_empty());
3350    }
3351
3352    #[test]
3353    fn test_clear_account_history() {
3354        let temp_dir = TempDir::new().unwrap();
3355        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3356
3357        let address = Address::from([0x42; 20]);
3358
3359        // Add blocks 0-10
3360        let mut batch = provider.batch();
3361        batch.append_account_history_shard(address, 0..=10).unwrap();
3362        batch.commit().unwrap();
3363
3364        // Clear all history (simulates unwind from block 0)
3365        let mut batch = provider.batch();
3366        batch.clear_account_history(address).unwrap();
3367        batch.commit().unwrap();
3368
3369        // Verify no data remains
3370        let shards = provider.account_history_shards(address).unwrap();
3371        assert!(shards.is_empty(), "All shards should be deleted");
3372    }
3373
3374    #[test]
3375    fn test_unwind_non_sentinel_boundary() {
3376        let temp_dir = TempDir::new().unwrap();
3377        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3378
3379        let address = Address::from([0x42; 20]);
3380
3381        // Create three shards with non-sentinel boundary
3382        let mut batch = provider.batch();
3383
3384        // Shard 1: blocks 1-50, keyed by 50
3385        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3386        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3387
3388        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
3389        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3390        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3391
3392        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
3393        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3394        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3395
3396        batch.commit().unwrap();
3397
3398        // Verify 3 shards
3399        let shards = provider.account_history_shards(address).unwrap();
3400        assert_eq!(shards.len(), 3);
3401
3402        // Unwind to block 75 (truncates shard2, deletes shard3)
3403        let mut batch = provider.batch();
3404        batch.unwind_account_history_to(address, 75).unwrap();
3405        batch.commit().unwrap();
3406
3407        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
3408        let shards = provider.account_history_shards(address).unwrap();
3409        assert_eq!(shards.len(), 2);
3410
3411        // First shard unchanged
3412        assert_eq!(shards[0].0.highest_block_number, 50);
3413        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3414
3415        // Second shard truncated and re-keyed to MAX
3416        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3417        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3418    }
3419
3420    #[test]
3421    fn test_batch_auto_commit_on_threshold() {
3422        let temp_dir = TempDir::new().unwrap();
3423        let provider =
3424            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3425
3426        // Create batch with tiny threshold (1KB) to force auto-commits
3427        let mut batch = RocksDBBatch {
3428            provider: &provider,
3429            inner: WriteBatchWithTransaction::<true>::default(),
3430            buf: Vec::new(),
3431            auto_commit_threshold: Some(1024), // 1KB
3432        };
3433
3434        // Write entries until we exceed threshold multiple times
3435        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
3436        for i in 0..100u64 {
3437            let value = format!("value_{i:04}").into_bytes();
3438            batch.put::<TestTable>(i, &value).unwrap();
3439        }
3440
3441        // Data should already be visible (auto-committed) even before final commit
3442        // At least some entries should be readable
3443        let first_visible = provider.get::<TestTable>(0).unwrap();
3444        assert!(first_visible.is_some(), "Auto-committed data should be visible");
3445
3446        // Final commit for remaining batch
3447        batch.commit().unwrap();
3448
3449        // All entries should now be visible
3450        for i in 0..100u64 {
3451            let value = format!("value_{i:04}").into_bytes();
3452            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3453        }
3454    }
3455
3456    // ==================== PARAMETERIZED PRUNE TESTS ====================
3457
3458    /// Test case for account history pruning
3459    struct AccountPruneCase {
3460        name: &'static str,
3461        initial_shards: &'static [(u64, &'static [u64])],
3462        prune_to: u64,
3463        expected_outcome: PruneShardOutcome,
3464        expected_shards: &'static [(u64, &'static [u64])],
3465    }
3466
3467    /// Test case for storage history pruning
3468    struct StoragePruneCase {
3469        name: &'static str,
3470        initial_shards: &'static [(u64, &'static [u64])],
3471        prune_to: u64,
3472        expected_outcome: PruneShardOutcome,
3473        expected_shards: &'static [(u64, &'static [u64])],
3474    }
3475
3476    #[test]
3477    fn test_prune_account_history_cases() {
3478        const MAX: u64 = u64::MAX;
3479        const CASES: &[AccountPruneCase] = &[
3480            AccountPruneCase {
3481                name: "single_shard_truncate",
3482                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3483                prune_to: 25,
3484                expected_outcome: PruneShardOutcome::Updated,
3485                expected_shards: &[(MAX, &[30, 40])],
3486            },
3487            AccountPruneCase {
3488                name: "single_shard_delete_all",
3489                initial_shards: &[(MAX, &[10, 20])],
3490                prune_to: 20,
3491                expected_outcome: PruneShardOutcome::Deleted,
3492                expected_shards: &[],
3493            },
3494            AccountPruneCase {
3495                name: "single_shard_noop",
3496                initial_shards: &[(MAX, &[10, 20])],
3497                prune_to: 5,
3498                expected_outcome: PruneShardOutcome::Unchanged,
3499                expected_shards: &[(MAX, &[10, 20])],
3500            },
3501            AccountPruneCase {
3502                name: "no_shards",
3503                initial_shards: &[],
3504                prune_to: 100,
3505                expected_outcome: PruneShardOutcome::Unchanged,
3506                expected_shards: &[],
3507            },
3508            AccountPruneCase {
3509                name: "multi_shard_truncate_first",
3510                initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3511                prune_to: 25,
3512                expected_outcome: PruneShardOutcome::Updated,
3513                expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3514            },
3515            AccountPruneCase {
3516                name: "delete_first_shard_sentinel_unchanged",
3517                initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3518                prune_to: 20,
3519                expected_outcome: PruneShardOutcome::Deleted,
3520                expected_shards: &[(MAX, &[30, 40])],
3521            },
3522            AccountPruneCase {
3523                name: "multi_shard_delete_all_but_last",
3524                initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3525                prune_to: 22,
3526                expected_outcome: PruneShardOutcome::Deleted,
3527                expected_shards: &[(MAX, &[25, 30])],
3528            },
3529            AccountPruneCase {
3530                name: "mid_shard_preserves_key",
3531                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3532                prune_to: 25,
3533                expected_outcome: PruneShardOutcome::Updated,
3534                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3535            },
3536            // Equivalence tests
3537            AccountPruneCase {
3538                name: "equiv_delete_early_shards_keep_sentinel",
3539                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3540                prune_to: 55,
3541                expected_outcome: PruneShardOutcome::Deleted,
3542                expected_shards: &[(MAX, &[60, 70])],
3543            },
3544            AccountPruneCase {
3545                name: "equiv_sentinel_becomes_empty_with_prev",
3546                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3547                prune_to: 40,
3548                expected_outcome: PruneShardOutcome::Deleted,
3549                expected_shards: &[(MAX, &[50])],
3550            },
3551            AccountPruneCase {
3552                name: "equiv_all_shards_become_empty",
3553                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3554                prune_to: 51,
3555                expected_outcome: PruneShardOutcome::Deleted,
3556                expected_shards: &[],
3557            },
3558            AccountPruneCase {
3559                name: "equiv_non_sentinel_last_shard_promoted",
3560                initial_shards: &[(100, &[50, 75, 100])],
3561                prune_to: 60,
3562                expected_outcome: PruneShardOutcome::Updated,
3563                expected_shards: &[(MAX, &[75, 100])],
3564            },
3565            AccountPruneCase {
3566                name: "equiv_filter_within_shard",
3567                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3568                prune_to: 25,
3569                expected_outcome: PruneShardOutcome::Updated,
3570                expected_shards: &[(MAX, &[30, 40])],
3571            },
3572            AccountPruneCase {
3573                name: "equiv_multi_shard_partial_delete",
3574                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3575                prune_to: 35,
3576                expected_outcome: PruneShardOutcome::Deleted,
3577                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3578            },
3579        ];
3580
3581        let address = Address::from([0x42; 20]);
3582
3583        for case in CASES {
3584            let temp_dir = TempDir::new().unwrap();
3585            let provider =
3586                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3587
3588            // Setup initial shards
3589            let mut batch = provider.batch();
3590            for (highest, blocks) in case.initial_shards {
3591                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3592                batch
3593                    .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3594                    .unwrap();
3595            }
3596            batch.commit().unwrap();
3597
3598            // Prune
3599            let mut batch = provider.batch();
3600            let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3601            batch.commit().unwrap();
3602
3603            // Assert outcome
3604            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3605
3606            // Assert final shards
3607            let shards = provider.account_history_shards(address).unwrap();
3608            assert_eq!(
3609                shards.len(),
3610                case.expected_shards.len(),
3611                "case '{}': wrong shard count",
3612                case.name
3613            );
3614            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3615                shards.iter().zip(case.expected_shards.iter()).enumerate()
3616            {
3617                assert_eq!(
3618                    key.highest_block_number, *exp_key,
3619                    "case '{}': shard {} wrong key",
3620                    case.name, i
3621                );
3622                assert_eq!(
3623                    blocks.iter().collect::<Vec<_>>(),
3624                    *exp_blocks,
3625                    "case '{}': shard {} wrong blocks",
3626                    case.name,
3627                    i
3628                );
3629            }
3630        }
3631    }
3632
3633    #[test]
3634    fn test_prune_storage_history_cases() {
3635        const MAX: u64 = u64::MAX;
3636        const CASES: &[StoragePruneCase] = &[
3637            StoragePruneCase {
3638                name: "single_shard_truncate",
3639                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3640                prune_to: 25,
3641                expected_outcome: PruneShardOutcome::Updated,
3642                expected_shards: &[(MAX, &[30, 40])],
3643            },
3644            StoragePruneCase {
3645                name: "single_shard_delete_all",
3646                initial_shards: &[(MAX, &[10, 20])],
3647                prune_to: 20,
3648                expected_outcome: PruneShardOutcome::Deleted,
3649                expected_shards: &[],
3650            },
3651            StoragePruneCase {
3652                name: "noop",
3653                initial_shards: &[(MAX, &[10, 20])],
3654                prune_to: 5,
3655                expected_outcome: PruneShardOutcome::Unchanged,
3656                expected_shards: &[(MAX, &[10, 20])],
3657            },
3658            StoragePruneCase {
3659                name: "no_shards",
3660                initial_shards: &[],
3661                prune_to: 100,
3662                expected_outcome: PruneShardOutcome::Unchanged,
3663                expected_shards: &[],
3664            },
3665            StoragePruneCase {
3666                name: "mid_shard_preserves_key",
3667                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3668                prune_to: 25,
3669                expected_outcome: PruneShardOutcome::Updated,
3670                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3671            },
3672            // Equivalence tests
3673            StoragePruneCase {
3674                name: "equiv_sentinel_promotion",
3675                initial_shards: &[(100, &[50, 75, 100])],
3676                prune_to: 60,
3677                expected_outcome: PruneShardOutcome::Updated,
3678                expected_shards: &[(MAX, &[75, 100])],
3679            },
3680            StoragePruneCase {
3681                name: "equiv_delete_early_shards_keep_sentinel",
3682                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3683                prune_to: 55,
3684                expected_outcome: PruneShardOutcome::Deleted,
3685                expected_shards: &[(MAX, &[60, 70])],
3686            },
3687            StoragePruneCase {
3688                name: "equiv_sentinel_becomes_empty_with_prev",
3689                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3690                prune_to: 40,
3691                expected_outcome: PruneShardOutcome::Deleted,
3692                expected_shards: &[(MAX, &[50])],
3693            },
3694            StoragePruneCase {
3695                name: "equiv_all_shards_become_empty",
3696                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3697                prune_to: 51,
3698                expected_outcome: PruneShardOutcome::Deleted,
3699                expected_shards: &[],
3700            },
3701            StoragePruneCase {
3702                name: "equiv_filter_within_shard",
3703                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3704                prune_to: 25,
3705                expected_outcome: PruneShardOutcome::Updated,
3706                expected_shards: &[(MAX, &[30, 40])],
3707            },
3708            StoragePruneCase {
3709                name: "equiv_multi_shard_partial_delete",
3710                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3711                prune_to: 35,
3712                expected_outcome: PruneShardOutcome::Deleted,
3713                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3714            },
3715        ];
3716
3717        let address = Address::from([0x42; 20]);
3718        let storage_key = B256::from([0x01; 32]);
3719
3720        for case in CASES {
3721            let temp_dir = TempDir::new().unwrap();
3722            let provider =
3723                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3724
3725            // Setup initial shards
3726            let mut batch = provider.batch();
3727            for (highest, blocks) in case.initial_shards {
3728                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3729                let key = if *highest == MAX {
3730                    StorageShardedKey::last(address, storage_key)
3731                } else {
3732                    StorageShardedKey::new(address, storage_key, *highest)
3733                };
3734                batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3735            }
3736            batch.commit().unwrap();
3737
3738            // Prune
3739            let mut batch = provider.batch();
3740            let outcome =
3741                batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3742            batch.commit().unwrap();
3743
3744            // Assert outcome
3745            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3746
3747            // Assert final shards
3748            let shards = provider.storage_history_shards(address, storage_key).unwrap();
3749            assert_eq!(
3750                shards.len(),
3751                case.expected_shards.len(),
3752                "case '{}': wrong shard count",
3753                case.name
3754            );
3755            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3756                shards.iter().zip(case.expected_shards.iter()).enumerate()
3757            {
3758                assert_eq!(
3759                    key.sharded_key.highest_block_number, *exp_key,
3760                    "case '{}': shard {} wrong key",
3761                    case.name, i
3762                );
3763                assert_eq!(
3764                    blocks.iter().collect::<Vec<_>>(),
3765                    *exp_blocks,
3766                    "case '{}': shard {} wrong blocks",
3767                    case.name,
3768                    i
3769                );
3770            }
3771        }
3772    }
3773
3774    #[test]
3775    fn test_prune_storage_history_does_not_affect_other_slots() {
3776        let temp_dir = TempDir::new().unwrap();
3777        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3778
3779        let address = Address::from([0x42; 20]);
3780        let slot1 = B256::from([0x01; 32]);
3781        let slot2 = B256::from([0x02; 32]);
3782
3783        // Two different storage slots
3784        let mut batch = provider.batch();
3785        batch
3786            .put::<tables::StoragesHistory>(
3787                StorageShardedKey::last(address, slot1),
3788                &BlockNumberList::new_pre_sorted([10u64, 20]),
3789            )
3790            .unwrap();
3791        batch
3792            .put::<tables::StoragesHistory>(
3793                StorageShardedKey::last(address, slot2),
3794                &BlockNumberList::new_pre_sorted([30u64, 40]),
3795            )
3796            .unwrap();
3797        batch.commit().unwrap();
3798
3799        // Prune slot1 to block 20 (deletes all)
3800        let mut batch = provider.batch();
3801        let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
3802        batch.commit().unwrap();
3803
3804        assert_eq!(outcome, PruneShardOutcome::Deleted);
3805
3806        // slot1 should be empty
3807        let shards1 = provider.storage_history_shards(address, slot1).unwrap();
3808        assert!(shards1.is_empty());
3809
3810        // slot2 should be unchanged
3811        let shards2 = provider.storage_history_shards(address, slot2).unwrap();
3812        assert_eq!(shards2.len(), 1);
3813        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
3814    }
3815
3816    #[test]
3817    fn test_prune_invariants() {
3818        // Test invariants: no empty shards, sentinel is always last
3819        let address = Address::from([0x42; 20]);
3820        let storage_key = B256::from([0x01; 32]);
3821
3822        // Test cases that exercise invariants
3823        #[allow(clippy::type_complexity)]
3824        let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
3825            // Account: shards where middle becomes empty
3826            (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
3827            // Account: non-sentinel shard only, partial prune -> must become sentinel
3828            (&[(100, &[50, 100])], 60),
3829        ];
3830
3831        for (initial_shards, prune_to) in invariant_cases {
3832            // Test account history invariants
3833            {
3834                let temp_dir = TempDir::new().unwrap();
3835                let provider =
3836                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3837
3838                let mut batch = provider.batch();
3839                for (highest, blocks) in *initial_shards {
3840                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3841                    batch
3842                        .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3843                        .unwrap();
3844                }
3845                batch.commit().unwrap();
3846
3847                let mut batch = provider.batch();
3848                batch.prune_account_history_to(address, *prune_to).unwrap();
3849                batch.commit().unwrap();
3850
3851                let shards = provider.account_history_shards(address).unwrap();
3852
3853                // Invariant 1: no empty shards
3854                for (key, blocks) in &shards {
3855                    assert!(
3856                        !blocks.is_empty(),
3857                        "Account: empty shard at key {}",
3858                        key.highest_block_number
3859                    );
3860                }
3861
3862                // Invariant 2: last shard is sentinel
3863                if !shards.is_empty() {
3864                    let last = shards.last().unwrap();
3865                    assert_eq!(
3866                        last.0.highest_block_number,
3867                        u64::MAX,
3868                        "Account: last shard must be sentinel"
3869                    );
3870                }
3871            }
3872
3873            // Test storage history invariants
3874            {
3875                let temp_dir = TempDir::new().unwrap();
3876                let provider =
3877                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3878
3879                let mut batch = provider.batch();
3880                for (highest, blocks) in *initial_shards {
3881                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3882                    let key = if *highest == u64::MAX {
3883                        StorageShardedKey::last(address, storage_key)
3884                    } else {
3885                        StorageShardedKey::new(address, storage_key, *highest)
3886                    };
3887                    batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3888                }
3889                batch.commit().unwrap();
3890
3891                let mut batch = provider.batch();
3892                batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
3893                batch.commit().unwrap();
3894
3895                let shards = provider.storage_history_shards(address, storage_key).unwrap();
3896
3897                // Invariant 1: no empty shards
3898                for (key, blocks) in &shards {
3899                    assert!(
3900                        !blocks.is_empty(),
3901                        "Storage: empty shard at key {}",
3902                        key.sharded_key.highest_block_number
3903                    );
3904                }
3905
3906                // Invariant 2: last shard is sentinel
3907                if !shards.is_empty() {
3908                    let last = shards.last().unwrap();
3909                    assert_eq!(
3910                        last.0.sharded_key.highest_block_number,
3911                        u64::MAX,
3912                        "Storage: last shard must be sentinel"
3913                    );
3914                }
3915            }
3916        }
3917    }
3918
3919    #[test]
3920    fn test_prune_account_history_batch_multiple_sorted_targets() {
3921        let temp_dir = TempDir::new().unwrap();
3922        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3923
3924        let addr1 = Address::from([0x01; 20]);
3925        let addr2 = Address::from([0x02; 20]);
3926        let addr3 = Address::from([0x03; 20]);
3927
3928        // Setup shards for each address
3929        let mut batch = provider.batch();
3930        batch
3931            .put::<tables::AccountsHistory>(
3932                ShardedKey::new(addr1, u64::MAX),
3933                &BlockNumberList::new_pre_sorted([10, 20, 30]),
3934            )
3935            .unwrap();
3936        batch
3937            .put::<tables::AccountsHistory>(
3938                ShardedKey::new(addr2, u64::MAX),
3939                &BlockNumberList::new_pre_sorted([5, 10, 15]),
3940            )
3941            .unwrap();
3942        batch
3943            .put::<tables::AccountsHistory>(
3944                ShardedKey::new(addr3, u64::MAX),
3945                &BlockNumberList::new_pre_sorted([100, 200]),
3946            )
3947            .unwrap();
3948        batch.commit().unwrap();
3949
3950        // Prune all three (sorted by address)
3951        let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
3952        targets.sort_by_key(|(addr, _)| *addr);
3953
3954        let mut batch = provider.batch();
3955        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
3956        batch.commit().unwrap();
3957
3958        // addr1: prune <=15, keep [20, 30] -> updated
3959        // addr2: prune <=10, keep [15] -> updated
3960        // addr3: prune <=50, keep [100, 200] -> unchanged
3961        assert_eq!(outcomes.updated, 2);
3962        assert_eq!(outcomes.unchanged, 1);
3963
3964        let shards1 = provider.account_history_shards(addr1).unwrap();
3965        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
3966
3967        let shards2 = provider.account_history_shards(addr2).unwrap();
3968        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
3969
3970        let shards3 = provider.account_history_shards(addr3).unwrap();
3971        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
3972    }
3973
3974    #[test]
3975    fn test_prune_account_history_batch_target_with_no_shards() {
3976        let temp_dir = TempDir::new().unwrap();
3977        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3978
3979        let addr1 = Address::from([0x01; 20]);
3980        let addr2 = Address::from([0x02; 20]); // No shards for this one
3981        let addr3 = Address::from([0x03; 20]);
3982
3983        // Only setup shards for addr1 and addr3
3984        let mut batch = provider.batch();
3985        batch
3986            .put::<tables::AccountsHistory>(
3987                ShardedKey::new(addr1, u64::MAX),
3988                &BlockNumberList::new_pre_sorted([10, 20]),
3989            )
3990            .unwrap();
3991        batch
3992            .put::<tables::AccountsHistory>(
3993                ShardedKey::new(addr3, u64::MAX),
3994                &BlockNumberList::new_pre_sorted([30, 40]),
3995            )
3996            .unwrap();
3997        batch.commit().unwrap();
3998
3999        // Prune all three (addr2 has no shards - tests p > target_prefix case)
4000        let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4001        targets.sort_by_key(|(addr, _)| *addr);
4002
4003        let mut batch = provider.batch();
4004        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4005        batch.commit().unwrap();
4006
4007        // addr1: updated (keep [20])
4008        // addr2: unchanged (no shards)
4009        // addr3: updated (keep [40])
4010        assert_eq!(outcomes.updated, 2);
4011        assert_eq!(outcomes.unchanged, 1);
4012
4013        let shards1 = provider.account_history_shards(addr1).unwrap();
4014        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4015
4016        let shards3 = provider.account_history_shards(addr3).unwrap();
4017        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4018    }
4019
4020    #[test]
4021    fn test_prune_storage_history_batch_multiple_sorted_targets() {
4022        let temp_dir = TempDir::new().unwrap();
4023        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4024
4025        let addr = Address::from([0x42; 20]);
4026        let slot1 = B256::from([0x01; 32]);
4027        let slot2 = B256::from([0x02; 32]);
4028
4029        // Setup shards
4030        let mut batch = provider.batch();
4031        batch
4032            .put::<tables::StoragesHistory>(
4033                StorageShardedKey::new(addr, slot1, u64::MAX),
4034                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4035            )
4036            .unwrap();
4037        batch
4038            .put::<tables::StoragesHistory>(
4039                StorageShardedKey::new(addr, slot2, u64::MAX),
4040                &BlockNumberList::new_pre_sorted([5, 15, 25]),
4041            )
4042            .unwrap();
4043        batch.commit().unwrap();
4044
4045        // Prune both (sorted)
4046        let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4047        targets.sort_by_key(|((a, s), _)| (*a, *s));
4048
4049        let mut batch = provider.batch();
4050        let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4051        batch.commit().unwrap();
4052
4053        assert_eq!(outcomes.updated, 2);
4054
4055        let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4056        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4057
4058        let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4059        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4060    }
4061}