Skip to main content

reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5    map::{AddressMap, HashMap},
6    Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13    database_metrics::DatabaseMetrics,
14    models::{
15        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16        StorageSettings,
17    },
18    table::{Compress, Decode, Decompress, Encode, Table},
19    tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25    provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30    OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
31    WriteBatchWithTransaction, WriteBufferManager, WriteOptions, DB,
32};
33use std::{
34    collections::BTreeMap,
35    fmt,
36    path::{Path, PathBuf},
37    sync::Arc,
38};
39use tracing::instrument;
40
41/// Returns [`WriteOptions`] with WAL sync enabled for crash durability.
42fn synced_write_options() -> WriteOptions {
43    let mut opts = WriteOptions::default();
44    opts.set_sync(true);
45    opts
46}
47
48/// Pending `RocksDB` batches type alias.
49pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
50
51/// Raw key-value result from a `RocksDB` iterator.
52type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
53
54/// Statistics for a single `RocksDB` table (column family).
55#[derive(Debug, Clone)]
56pub struct RocksDBTableStats {
57    /// Size of SST files on disk in bytes.
58    pub sst_size_bytes: u64,
59    /// Size of memtables in memory in bytes.
60    pub memtable_size_bytes: u64,
61    /// Name of the table/column family.
62    pub name: String,
63    /// Estimated number of keys in the table.
64    pub estimated_num_keys: u64,
65    /// Estimated size of live data in bytes (SST files + memtables).
66    pub estimated_size_bytes: u64,
67    /// Estimated bytes pending compaction (reclaimable space).
68    pub pending_compaction_bytes: u64,
69}
70
71/// Database-level statistics for `RocksDB`.
72///
73/// Contains both per-table statistics and DB-level metrics like WAL size.
74#[derive(Debug, Clone)]
75pub struct RocksDBStats {
76    /// Statistics for each table (column family).
77    pub tables: Vec<RocksDBTableStats>,
78    /// Total size of WAL (Write-Ahead Log) files in bytes.
79    ///
80    /// WAL is shared across all tables and not included in per-table metrics.
81    pub wal_size_bytes: u64,
82}
83
84/// Context for `RocksDB` block writes.
85#[derive(Clone)]
86pub(crate) struct RocksDBWriteCtx {
87    /// The first block number being written.
88    pub first_block_number: BlockNumber,
89    /// The prune mode for transaction lookup, if any.
90    pub prune_tx_lookup: Option<PruneMode>,
91    /// Storage settings determining what goes to `RocksDB`.
92    pub storage_settings: StorageSettings,
93    /// Pending batches to push to after writing.
94    pub pending_batches: PendingRocksDBBatches,
95}
96
97impl fmt::Debug for RocksDBWriteCtx {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        f.debug_struct("RocksDBWriteCtx")
100            .field("first_block_number", &self.first_block_number)
101            .field("prune_tx_lookup", &self.prune_tx_lookup)
102            .field("storage_settings", &self.storage_settings)
103            .field("pending_batches", &"<pending batches>")
104            .finish()
105    }
106}
107
108/// Default cache size for `RocksDB` block cache (128 MB).
109const DEFAULT_CACHE_SIZE: usize = 128 << 20;
110
111/// Default block size for `RocksDB` tables (16 KB).
112const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
113
114/// Default max background jobs for `RocksDB` compaction and flushing.
115const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
116
117/// Default max open file descriptors for `RocksDB`.
118///
119/// Caps the number of SST file handles `RocksDB` keeps open simultaneously.
120/// Set to 512 to stay within the common default OS `ulimit -n` of 1024,
121/// leaving headroom for MDBX, static files, and other I/O.
122/// `RocksDB` uses an internal table cache and re-opens files on demand,
123/// so this has negligible performance impact on read-heavy workloads.
124const DEFAULT_MAX_OPEN_FILES: i32 = 512;
125
126/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
127const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
128
129/// Default write buffer size for `RocksDB` memtables (128 MB).
130///
131/// Larger memtables reduce flush frequency during burst writes, providing more consistent
132/// tail latency. Benchmarks showed 128 MB reduces p99 latency variance by ~80% compared
133/// to 64 MB default, with negligible impact on mean throughput.
134const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
135
136/// Default total `RocksDB` memtable memory budget across column families (4 GiB).
137///
138/// This is a soft limit; with write stalls enabled, `RocksDB` waits for flushes once
139/// memtable arena usage exceeds the budget.
140const DEFAULT_WRITE_BUFFER_MANAGER_SIZE: usize = 4 * 1024 * 1024 * 1024;
141
142/// Default buffer capacity for compression in batches.
143/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
144/// reducing the first few reallocations without over-allocating.
145const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
146
147/// Default auto-commit threshold for batch writes (512 MiB).
148///
149/// When a batch exceeds this size, it is automatically committed to prevent OOM
150/// during large bulk writes. Keep this below the `RocksDB` write buffer manager
151/// budget so stalls can recover without waiting on a single large flush.
152/// The consistency check on startup heals any crash that occurs between auto-commits.
153const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 512 * 1024 * 1024;
154
155/// Builder for [`RocksDBProvider`].
156pub struct RocksDBBuilder {
157    path: PathBuf,
158    column_families: Vec<String>,
159    enable_metrics: bool,
160    enable_statistics: bool,
161    log_level: rocksdb::LogLevel,
162    block_cache: Cache,
163    read_only: bool,
164}
165
166impl fmt::Debug for RocksDBBuilder {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        f.debug_struct("RocksDBBuilder")
169            .field("path", &self.path)
170            .field("column_families", &self.column_families)
171            .field("enable_metrics", &self.enable_metrics)
172            .finish()
173    }
174}
175
176impl RocksDBBuilder {
177    /// Creates a new builder with optimized default options.
178    pub fn new(path: impl AsRef<Path>) -> Self {
179        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
180        Self {
181            path: path.as_ref().to_path_buf(),
182            column_families: Vec::new(),
183            enable_metrics: false,
184            enable_statistics: false,
185            log_level: rocksdb::LogLevel::Info,
186            block_cache: cache,
187            read_only: false,
188        }
189    }
190
191    /// Creates default table options with shared block cache.
192    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
193        let mut table_options = BlockBasedOptions::default();
194        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
195        table_options.set_cache_index_and_filter_blocks(true);
196        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
197        // Shared block cache for all column families.
198        table_options.set_block_cache(cache);
199        table_options
200    }
201
202    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
203    fn default_options(
204        log_level: rocksdb::LogLevel,
205        cache: &Cache,
206        enable_statistics: bool,
207    ) -> Options {
208        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
209        let table_options = Self::default_table_options(cache);
210
211        let mut options = Options::default();
212        options.set_block_based_table_factory(&table_options);
213        options.create_if_missing(true);
214        options.create_missing_column_families(true);
215        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
216        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
217        let write_buffer_manager =
218            WriteBufferManager::new_write_buffer_manager(DEFAULT_WRITE_BUFFER_MANAGER_SIZE, true);
219        options.set_write_buffer_manager(&write_buffer_manager);
220
221        options.set_bottommost_compression_type(DBCompressionType::Zstd);
222        options.set_bottommost_zstd_max_train_bytes(0, true);
223        options.set_compression_type(DBCompressionType::Lz4);
224        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
225
226        options.set_log_level(log_level);
227
228        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
229
230        // Delete obsolete WAL files immediately after all column families have flushed.
231        // Both set to 0 means "delete ASAP, no archival".
232        options.set_wal_ttl_seconds(0);
233        options.set_wal_size_limit_mb(0);
234
235        // Statistics can view from RocksDB log file
236        if enable_statistics {
237            options.enable_statistics();
238        }
239
240        options
241    }
242
243    /// Creates optimized column family options.
244    fn default_column_family_options(cache: &Cache) -> Options {
245        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
246        let table_options = Self::default_table_options(cache);
247
248        let mut cf_options = Options::default();
249        cf_options.set_block_based_table_factory(&table_options);
250        cf_options.set_level_compaction_dynamic_level_bytes(true);
251        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
252        cf_options.set_compression_type(DBCompressionType::Lz4);
253        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
254        // Only use Zstd compression, disable dictionary training
255        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
256        cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
257
258        cf_options
259    }
260
261    /// Creates optimized column family options for `TransactionHashNumbers`.
262    ///
263    /// This table stores `B256 -> TxNumber` mappings where:
264    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
265    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
266    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
267    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
268        let mut table_options = BlockBasedOptions::default();
269        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
270        table_options.set_cache_index_and_filter_blocks(true);
271        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
272        table_options.set_block_cache(cache);
273        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
274        // and waste memory
275
276        let mut cf_options = Options::default();
277        cf_options.set_block_based_table_factory(&table_options);
278        cf_options.set_level_compaction_dynamic_level_bytes(true);
279        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
280        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
281        cf_options.set_compression_type(DBCompressionType::None);
282        cf_options.set_bottommost_compression_type(DBCompressionType::None);
283
284        cf_options
285    }
286
287    /// Adds a column family for a specific table type.
288    pub fn with_table<T: Table>(mut self) -> Self {
289        self.column_families.push(T::NAME.to_string());
290        self
291    }
292
293    /// Registers the default tables used by reth for `RocksDB` storage.
294    ///
295    /// This registers:
296    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
297    /// - [`tables::AccountsHistory`] - Account history index
298    /// - [`tables::StoragesHistory`] - Storage history index
299    pub fn with_default_tables(self) -> Self {
300        self.with_table::<tables::TransactionHashNumbers>()
301            .with_table::<tables::AccountsHistory>()
302            .with_table::<tables::StoragesHistory>()
303    }
304
305    /// Enables metrics.
306    pub const fn with_metrics(mut self) -> Self {
307        self.enable_metrics = true;
308        self
309    }
310
311    /// Enables `RocksDB` internal statistics collection.
312    pub const fn with_statistics(mut self) -> Self {
313        self.enable_statistics = true;
314        self
315    }
316
317    /// Sets the log level from `DatabaseArgs` configuration.
318    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
319        if let Some(level) = log_level {
320            self.log_level = convert_log_level(level);
321        }
322        self
323    }
324
325    /// Sets a custom block cache size.
326    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
327        self.block_cache = Cache::new_lru_cache(capacity_bytes);
328        self
329    }
330
331    /// Sets read-only mode.
332    ///
333    /// Opens the database as a secondary instance, which supports catching up
334    /// with the primary via [`RocksDBProvider::try_catch_up_with_primary`].
335    /// A temporary directory is created automatically for the secondary's LOG files.
336    ///
337    /// Note: Write operations on a read-only provider will panic at runtime.
338    pub const fn with_read_only(mut self, read_only: bool) -> Self {
339        self.read_only = read_only;
340        self
341    }
342
343    /// Builds the [`RocksDBProvider`].
344    pub fn build(self) -> ProviderResult<RocksDBProvider> {
345        let options =
346            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
347
348        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
349            .column_families
350            .iter()
351            .map(|name| {
352                let cf_options = if name == tables::TransactionHashNumbers::NAME {
353                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
354                } else {
355                    Self::default_column_family_options(&self.block_cache)
356                };
357                ColumnFamilyDescriptor::new(name.clone(), cf_options)
358            })
359            .collect();
360
361        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
362
363        if self.read_only {
364            // Open as secondary instance for catch-up capability.
365            // Secondary needs max_open_files = -1 to keep all FDs open.
366            let mut options = options;
367            options.set_max_open_files(-1);
368
369            let secondary_path = self
370                .path
371                .parent()
372                .unwrap_or(&self.path)
373                .join(format!("rocksdb-secondary-tmp-{}", std::process::id()));
374            reth_fs_util::create_dir_all(&secondary_path).map_err(ProviderError::other)?;
375
376            let db = DB::open_cf_descriptors_as_secondary(
377                &options,
378                &self.path,
379                &secondary_path,
380                cf_descriptors,
381            )
382            .map_err(|e| {
383                ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
384                    message: e.to_string().into(),
385                    code: -1,
386                }))
387            })?;
388            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::Secondary {
389                db,
390                metrics,
391                secondary_path,
392            })))
393        } else {
394            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
395            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
396            // detection at commit) and is backed by DBCommon, giving us access to
397            // cancel_all_background_work for clean shutdown.
398            let db =
399                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
400                    .map_err(|e| {
401                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
402                            message: e.to_string().into(),
403                            code: -1,
404                        }))
405                    })?;
406            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
407        }
408    }
409}
410
411/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
412/// allocated buffer when we can just use their reference.
413macro_rules! compress_to_buf_or_ref {
414    ($buf:expr, $value:expr) => {
415        if let Some(value) = $value.uncompressable_ref() {
416            Some(value)
417        } else {
418            $buf.clear();
419            $value.compress_to_buf(&mut $buf);
420            None
421        }
422    };
423}
424
425/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
426#[derive(Debug)]
427pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
428
429/// Inner state for `RocksDB` provider.
430enum RocksDBProviderInner {
431    /// Read-write mode using `OptimisticTransactionDB`.
432    ReadWrite {
433        /// `RocksDB` database instance with optimistic transaction support.
434        db: OptimisticTransactionDB,
435        /// Metrics latency & operations.
436        metrics: Option<RocksDBMetrics>,
437    },
438    /// Secondary mode using `DB` opened with `open_cf_descriptors_as_secondary`.
439    /// Supports catching up with the primary via `try_catch_up_with_primary`.
440    /// Does not support snapshots; consistency is guaranteed externally.
441    Secondary {
442        /// Secondary `RocksDB` database instance.
443        db: DB,
444        /// Metrics latency & operations.
445        metrics: Option<RocksDBMetrics>,
446        /// Temporary directory for secondary LOG files, removed on drop.
447        secondary_path: PathBuf,
448    },
449}
450
451impl RocksDBProviderInner {
452    /// Returns the metrics for this provider.
453    const fn metrics(&self) -> Option<&RocksDBMetrics> {
454        match self {
455            Self::ReadWrite { metrics, .. } | Self::Secondary { metrics, .. } => metrics.as_ref(),
456        }
457    }
458
459    /// Returns the read-write database, panicking if in read-only mode.
460    fn db_rw(&self) -> &OptimisticTransactionDB {
461        match self {
462            Self::ReadWrite { db, .. } => db,
463            Self::Secondary { .. } => {
464                panic!("Cannot perform write operation on secondary RocksDB provider")
465            }
466        }
467    }
468
469    /// Gets the column family handle for a table.
470    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
471        let cf = match self {
472            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
473            Self::Secondary { db, .. } => db.cf_handle(T::NAME),
474        };
475        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
476    }
477
478    /// Gets a value from a column family.
479    fn get_cf(
480        &self,
481        cf: &rocksdb::ColumnFamily,
482        key: impl AsRef<[u8]>,
483    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
484        match self {
485            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
486            Self::Secondary { db, .. } => db.get_cf(cf, key),
487        }
488    }
489
490    /// Puts a value into a column family.
491    fn put_cf(
492        &self,
493        cf: &rocksdb::ColumnFamily,
494        key: impl AsRef<[u8]>,
495        value: impl AsRef<[u8]>,
496    ) -> Result<(), rocksdb::Error> {
497        self.db_rw().put_cf(cf, key, value)
498    }
499
500    /// Deletes a value from a column family.
501    fn delete_cf(
502        &self,
503        cf: &rocksdb::ColumnFamily,
504        key: impl AsRef<[u8]>,
505    ) -> Result<(), rocksdb::Error> {
506        self.db_rw().delete_cf(cf, key)
507    }
508
509    /// Deletes a range of values from a column family.
510    fn delete_range_cf<K: AsRef<[u8]>>(
511        &self,
512        cf: &rocksdb::ColumnFamily,
513        from: K,
514        to: K,
515    ) -> Result<(), rocksdb::Error> {
516        self.db_rw().delete_range_cf(cf, from, to)
517    }
518
519    /// Returns an iterator over a column family.
520    fn iterator_cf(
521        &self,
522        cf: &rocksdb::ColumnFamily,
523        mode: IteratorMode<'_>,
524    ) -> RocksDBIterEnum<'_> {
525        match self {
526            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
527            Self::Secondary { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
528        }
529    }
530
531    /// Returns a raw iterator over a column family.
532    ///
533    /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
534    /// repositioning without creating a new iterator.
535    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
536        match self {
537            Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
538            Self::Secondary { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
539        }
540    }
541
542    /// Returns a read-only, point-in-time snapshot of the database.
543    fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
544        match self {
545            Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
546            Self::Secondary { db, .. } => RocksReadSnapshotInner::Secondary(db),
547        }
548    }
549
550    /// Returns the path to the database directory.
551    fn path(&self) -> &Path {
552        match self {
553            Self::ReadWrite { db, .. } => db.path(),
554            Self::Secondary { db, .. } => db.path(),
555        }
556    }
557
558    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
559    ///
560    /// WAL files have a `.log` extension in the `RocksDB` directory.
561    fn wal_size_bytes(&self) -> u64 {
562        let path = self.path();
563
564        match std::fs::read_dir(path) {
565            Ok(entries) => entries
566                .filter_map(|e| e.ok())
567                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
568                .filter_map(|e| e.metadata().ok())
569                .map(|m| m.len())
570                .sum(),
571            Err(_) => 0,
572        }
573    }
574
575    /// Returns statistics for all column families in the database.
576    fn table_stats(&self) -> Vec<RocksDBTableStats> {
577        let mut stats = Vec::new();
578
579        macro_rules! collect_stats {
580            ($db:expr) => {
581                for cf_name in ROCKSDB_TABLES {
582                    if let Some(cf) = $db.cf_handle(cf_name) {
583                        let estimated_num_keys = $db
584                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
585                            .ok()
586                            .flatten()
587                            .unwrap_or(0);
588
589                        // SST files size (on-disk) + memtable size (in-memory)
590                        let sst_size = $db
591                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
592                            .ok()
593                            .flatten()
594                            .unwrap_or(0);
595
596                        let memtable_size = $db
597                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
598                            .ok()
599                            .flatten()
600                            .unwrap_or(0);
601
602                        let estimated_size_bytes = sst_size + memtable_size;
603
604                        let pending_compaction_bytes = $db
605                            .property_int_value_cf(
606                                cf,
607                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
608                            )
609                            .ok()
610                            .flatten()
611                            .unwrap_or(0);
612
613                        stats.push(RocksDBTableStats {
614                            sst_size_bytes: sst_size,
615                            memtable_size_bytes: memtable_size,
616                            name: cf_name.to_string(),
617                            estimated_num_keys,
618                            estimated_size_bytes,
619                            pending_compaction_bytes,
620                        });
621                    }
622                }
623            };
624        }
625
626        match self {
627            Self::ReadWrite { db, .. } => collect_stats!(db),
628            Self::Secondary { db, .. } => collect_stats!(db),
629        }
630
631        stats
632    }
633
634    /// Returns database-level statistics including per-table stats and WAL size.
635    fn db_stats(&self) -> RocksDBStats {
636        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
637    }
638}
639
640impl fmt::Debug for RocksDBProviderInner {
641    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
642        match self {
643            Self::ReadWrite { metrics, .. } => f
644                .debug_struct("RocksDBProviderInner::ReadWrite")
645                .field("db", &"<OptimisticTransactionDB>")
646                .field("metrics", metrics)
647                .finish(),
648            Self::Secondary { metrics, .. } => f
649                .debug_struct("RocksDBProviderInner::Secondary")
650                .field("db", &"<DB (secondary)>")
651                .field("metrics", metrics)
652                .finish(),
653        }
654    }
655}
656
657impl Drop for RocksDBProviderInner {
658    fn drop(&mut self) {
659        match self {
660            Self::ReadWrite { db, .. } => {
661                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
662                // restart
663                if let Err(e) = db.flush_wal(true) {
664                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
665                }
666                for cf_name in ROCKSDB_TABLES {
667                    if let Some(cf) = db.cf_handle(cf_name) &&
668                        let Err(e) = db.flush_cf(&cf)
669                    {
670                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
671                    }
672                }
673                db.cancel_all_background_work(true);
674            }
675            Self::Secondary { db, secondary_path, .. } => {
676                db.cancel_all_background_work(true);
677                let _ = std::fs::remove_dir_all(secondary_path);
678            }
679        }
680    }
681}
682
683impl Clone for RocksDBProvider {
684    fn clone(&self) -> Self {
685        Self(self.0.clone())
686    }
687}
688
689impl DatabaseMetrics for RocksDBProvider {
690    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
691        let mut metrics = Vec::new();
692
693        for stat in self.table_stats() {
694            metrics.push((
695                "rocksdb.table_size",
696                stat.estimated_size_bytes as f64,
697                vec![Label::new("table", stat.name.clone())],
698            ));
699            metrics.push((
700                "rocksdb.table_entries",
701                stat.estimated_num_keys as f64,
702                vec![Label::new("table", stat.name.clone())],
703            ));
704            metrics.push((
705                "rocksdb.pending_compaction_bytes",
706                stat.pending_compaction_bytes as f64,
707                vec![Label::new("table", stat.name.clone())],
708            ));
709            metrics.push((
710                "rocksdb.sst_size",
711                stat.sst_size_bytes as f64,
712                vec![Label::new("table", stat.name.clone())],
713            ));
714            metrics.push((
715                "rocksdb.memtable_size",
716                stat.memtable_size_bytes as f64,
717                vec![Label::new("table", stat.name)],
718            ));
719        }
720
721        // WAL size (DB-level, shared across all tables)
722        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
723
724        metrics
725    }
726}
727
728impl RocksDBProvider {
729    /// Creates a new `RocksDB` provider.
730    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
731        RocksDBBuilder::new(path).build()
732    }
733
734    /// Creates a new `RocksDB` provider builder.
735    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
736        RocksDBBuilder::new(path)
737    }
738
739    /// Returns `true` if a `RocksDB` database exists at the given path.
740    ///
741    /// Checks for the presence of the `CURRENT` file, which `RocksDB` creates
742    /// when initializing a database.
743    pub fn exists(path: impl AsRef<Path>) -> bool {
744        path.as_ref().join("CURRENT").exists()
745    }
746
747    /// Returns `true` if this provider is in read-only mode.
748    pub fn is_read_only(&self) -> bool {
749        matches!(self.0.as_ref(), RocksDBProviderInner::Secondary { .. })
750    }
751
752    /// Tries to catch up with the primary instance by reading new WAL and MANIFEST entries.
753    ///
754    /// This is a no-op for read-write and read-only providers.
755    /// For secondary providers, this incrementally syncs with the primary's latest state.
756    pub fn try_catch_up_with_primary(&self) -> ProviderResult<()> {
757        match self.0.as_ref() {
758            RocksDBProviderInner::Secondary { db, .. } => {
759                db.try_catch_up_with_primary().map_err(|e| {
760                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
761                        message: e.to_string().into(),
762                        code: -1,
763                    }))
764                })
765            }
766            _ => Ok(()),
767        }
768    }
769
770    /// Returns a read-only, point-in-time snapshot of the database.
771    ///
772    /// Lighter weight than [`RocksTx`] — no write-conflict tracking, and `Send + Sync`.
773    pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
774        RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
775    }
776
777    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
778    ///
779    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
780    /// Conflict detection happens at commit time, not at write time.
781    ///
782    /// # Panics
783    /// Panics if the provider is in read-only mode.
784    pub fn tx(&self) -> RocksTx<'_> {
785        let write_options = synced_write_options();
786        let txn_options = OptimisticTransactionOptions::default();
787        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
788        RocksTx { inner, provider: self }
789    }
790
791    /// Creates a new batch for atomic writes.
792    ///
793    /// Use [`Self::write_batch`] for closure-based atomic writes.
794    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
795    ///
796    /// # Panics
797    /// Panics if the provider is in read-only mode when attempting to commit.
798    pub fn batch(&self) -> RocksDBBatch<'_> {
799        RocksDBBatch {
800            provider: self,
801            inner: WriteBatchWithTransaction::<true>::default(),
802            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
803            auto_commit_threshold: None,
804        }
805    }
806
807    /// Creates a new batch with auto-commit enabled.
808    ///
809    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
810    /// committed and reset. This prevents OOM during large bulk writes while maintaining
811    /// crash-safety via the consistency check on startup.
812    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
813        RocksDBBatch {
814            provider: self,
815            inner: WriteBatchWithTransaction::<true>::default(),
816            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
817            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
818        }
819    }
820
821    /// Gets the column family handle for a table.
822    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
823        self.0.cf_handle::<T>()
824    }
825
826    /// Executes a function and records metrics with the given operation and table name.
827    fn execute_with_operation_metric<R>(
828        &self,
829        operation: RocksDBOperation,
830        table: &'static str,
831        f: impl FnOnce(&Self) -> R,
832    ) -> R {
833        let start = self.0.metrics().map(|_| Instant::now());
834        let res = f(self);
835
836        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
837            metrics.record_operation(operation, table, start.elapsed());
838        }
839
840        res
841    }
842
843    /// Gets a value from the specified table.
844    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
845        self.get_encoded::<T>(&key.encode())
846    }
847
848    /// Gets a value from the specified table using pre-encoded key.
849    pub fn get_encoded<T: Table>(
850        &self,
851        key: &<T::Key as Encode>::Encoded,
852    ) -> ProviderResult<Option<T::Value>> {
853        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
854            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
855                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
856                    message: e.to_string().into(),
857                    code: -1,
858                }))
859            })?;
860
861            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
862        })
863    }
864
865    /// Gets raw bytes from the specified table without decompressing.
866    pub fn get_raw<T: Table>(&self, key: T::Key) -> ProviderResult<Option<Vec<u8>>> {
867        let encoded = key.encode();
868        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
869            this.0.get_cf(this.get_cf_handle::<T>()?, encoded.as_ref()).map_err(|e| {
870                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
871                    message: e.to_string().into(),
872                    code: -1,
873                }))
874            })
875        })
876    }
877
878    /// Puts upsert a value into the specified table with the given key.
879    ///
880    /// # Panics
881    /// Panics if the provider is in read-only mode.
882    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
883        let encoded_key = key.encode();
884        self.put_encoded::<T>(&encoded_key, value)
885    }
886
887    /// Puts a value into the specified table using pre-encoded key.
888    ///
889    /// # Panics
890    /// Panics if the provider is in read-only mode.
891    pub fn put_encoded<T: Table>(
892        &self,
893        key: &<T::Key as Encode>::Encoded,
894        value: &T::Value,
895    ) -> ProviderResult<()> {
896        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
897            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
898            // is thread safe if user want to avoid allocate buf each time, they can use
899            // write_batch api
900            let mut buf = Vec::new();
901            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
902
903            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
904                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
905                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
906                    operation: DatabaseWriteOperation::PutUpsert,
907                    table_name: T::NAME,
908                    key: key.as_ref().to_vec(),
909                })))
910            })
911        })
912    }
913
914    /// Deletes a value from the specified table.
915    ///
916    /// # Panics
917    /// Panics if the provider is in read-only mode.
918    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
919        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
920            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
921                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
922                    message: e.to_string().into(),
923                    code: -1,
924                }))
925            })
926        })
927    }
928
929    /// Clears all entries from the specified table.
930    ///
931    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
932    /// This end key must exceed the maximum encoded key size for any table.
933    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
934    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
935        let cf = self.get_cf_handle::<T>()?;
936
937        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
938            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
939                message: e.to_string().into(),
940                code: -1,
941            }))
942        })?;
943
944        Ok(())
945    }
946
947    /// Retrieves the first or last entry from a table based on the iterator mode.
948    fn get_boundary<T: Table>(
949        &self,
950        mode: IteratorMode<'_>,
951    ) -> ProviderResult<Option<(T::Key, T::Value)>> {
952        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
953            let cf = this.get_cf_handle::<T>()?;
954            let mut iter = this.0.iterator_cf(cf, mode);
955
956            match iter.next() {
957                Some(Ok((key_bytes, value_bytes))) => {
958                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
959                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
960                    let value = T::Value::decompress(&value_bytes)
961                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
962                    Ok(Some((key, value)))
963                }
964                Some(Err(e)) => {
965                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
966                        message: e.to_string().into(),
967                        code: -1,
968                    })))
969                }
970                None => Ok(None),
971            }
972        })
973    }
974
975    /// Gets the first (smallest key) entry from the specified table.
976    #[inline]
977    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
978        self.get_boundary::<T>(IteratorMode::Start)
979    }
980
981    /// Gets the last (largest key) entry from the specified table.
982    #[inline]
983    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
984        self.get_boundary::<T>(IteratorMode::End)
985    }
986
987    /// Creates an iterator over all entries in the specified table.
988    ///
989    /// Returns decoded `(Key, Value)` pairs in key order.
990    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
991        let cf = self.get_cf_handle::<T>()?;
992        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
993        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
994    }
995
996    /// Creates an iterator starting from the given key (inclusive, seek forward).
997    ///
998    /// Returns decoded `(Key, Value)` pairs starting from the first key >= `key`.
999    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksDBIter<'_, T>> {
1000        let cf = self.get_cf_handle::<T>()?;
1001        let encoded_key = key.encode();
1002        let iter = self
1003            .0
1004            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
1005        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
1006    }
1007
1008    /// Returns statistics for all column families in the database.
1009    ///
1010    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
1011    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
1012        self.0.table_stats()
1013    }
1014
1015    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
1016    ///
1017    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
1018    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
1019    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
1020    pub fn wal_size_bytes(&self) -> u64 {
1021        self.0.wal_size_bytes()
1022    }
1023
1024    /// Returns database-level statistics including per-table stats and WAL size.
1025    ///
1026    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
1027    pub fn db_stats(&self) -> RocksDBStats {
1028        self.0.db_stats()
1029    }
1030
1031    /// Flushes pending writes for the specified tables to disk.
1032    ///
1033    /// This performs a flush of:
1034    /// 1. The column family memtables for the specified table names to SST files
1035    /// 2. The Write-Ahead Log (WAL) with sync
1036    ///
1037    /// After this call completes, all data for the specified tables is durably persisted to disk.
1038    ///
1039    /// # Panics
1040    /// Panics if the provider is in read-only mode.
1041    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
1042    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
1043        let db = self.0.db_rw();
1044
1045        for cf_name in tables {
1046            if let Some(cf) = db.cf_handle(cf_name) {
1047                db.flush_cf(&cf).map_err(|e| {
1048                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1049                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1050                        operation: DatabaseWriteOperation::Flush,
1051                        table_name: cf_name,
1052                        key: Vec::new(),
1053                    })))
1054                })?;
1055            }
1056        }
1057
1058        db.flush_wal(true).map_err(|e| {
1059            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1060                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1061                operation: DatabaseWriteOperation::Flush,
1062                table_name: "WAL",
1063                key: Vec::new(),
1064            })))
1065        })?;
1066
1067        Ok(())
1068    }
1069
1070    /// Flushes and compacts all tables in `RocksDB`.
1071    ///
1072    /// This:
1073    /// 1. Flushes all column family memtables to SST files
1074    /// 2. Flushes the Write-Ahead Log (WAL) with sync
1075    /// 3. Triggers manual compaction on all column families to reclaim disk space
1076    ///
1077    /// Use this after large delete operations (like pruning) to reclaim disk space.
1078    ///
1079    /// # Panics
1080    /// Panics if the provider is in read-only mode.
1081    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1082    pub fn flush_and_compact(&self) -> ProviderResult<()> {
1083        self.flush(ROCKSDB_TABLES)?;
1084
1085        let db = self.0.db_rw();
1086
1087        for cf_name in ROCKSDB_TABLES {
1088            if let Some(cf) = db.cf_handle(cf_name) {
1089                db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
1090            }
1091        }
1092
1093        Ok(())
1094    }
1095
1096    /// Creates a raw iterator over all entries in the specified table.
1097    ///
1098    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
1099    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1100        let cf = self.get_cf_handle::<T>()?;
1101        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1102        Ok(RocksDBRawIter { inner: iter })
1103    }
1104
1105    /// Returns all account history shards for the given address in ascending key order.
1106    ///
1107    /// This is used for unwind operations where we need to scan all shards for an address
1108    /// and potentially delete or truncate them.
1109    pub fn account_history_shards(
1110        &self,
1111        address: Address,
1112    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1113        // Get the column family handle for the AccountsHistory table.
1114        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1115
1116        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
1117        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
1118        let start_key = ShardedKey::new(address, 0u64);
1119        let start_bytes = start_key.encode();
1120
1121        // Create a forward iterator starting from our seek position.
1122        let iter = self
1123            .0
1124            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1125
1126        let mut result = Vec::new();
1127        for item in iter {
1128            match item {
1129                Ok((key_bytes, value_bytes)) => {
1130                    // Decode the sharded key to check if we're still on the same address.
1131                    let key = ShardedKey::<Address>::decode(&key_bytes)
1132                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1133
1134                    // Stop when we reach a different address (keys are sorted by address first).
1135                    if key.key != address {
1136                        break;
1137                    }
1138
1139                    // Decompress the block number list stored in this shard.
1140                    let value = BlockNumberList::decompress(&value_bytes)
1141                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1142
1143                    result.push((key, value));
1144                }
1145                Err(e) => {
1146                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1147                        message: e.to_string().into(),
1148                        code: -1,
1149                    })));
1150                }
1151            }
1152        }
1153
1154        Ok(result)
1155    }
1156
1157    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1158    ///
1159    /// Iterates through all shards in ascending `highest_block_number` order until
1160    /// a different `(address, storage_key)` is encountered.
1161    pub fn storage_history_shards(
1162        &self,
1163        address: Address,
1164        storage_key: B256,
1165    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1166        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1167
1168        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1169        let start_bytes = start_key.encode();
1170
1171        let iter = self
1172            .0
1173            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1174
1175        let mut result = Vec::new();
1176        for item in iter {
1177            match item {
1178                Ok((key_bytes, value_bytes)) => {
1179                    let key = StorageShardedKey::decode(&key_bytes)
1180                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1181
1182                    if key.address != address || key.sharded_key.key != storage_key {
1183                        break;
1184                    }
1185
1186                    let value = BlockNumberList::decompress(&value_bytes)
1187                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1188
1189                    result.push((key, value));
1190                }
1191                Err(e) => {
1192                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1193                        message: e.to_string().into(),
1194                        code: -1,
1195                    })));
1196                }
1197            }
1198        }
1199
1200        Ok(result)
1201    }
1202
1203    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1204    ///
1205    /// Groups addresses by their minimum block number and calls the appropriate unwind
1206    /// operations. For each address, keeps only blocks less than the minimum block
1207    /// (i.e., removes the minimum block and all higher blocks).
1208    ///
1209    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1210    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1211    pub fn unwind_account_history_indices(
1212        &self,
1213        last_indices: &[(Address, BlockNumber)],
1214    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1215        let mut address_min_block: AddressMap<BlockNumber> =
1216            AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1217        for &(address, block_number) in last_indices {
1218            address_min_block
1219                .entry(address)
1220                .and_modify(|min| *min = (*min).min(block_number))
1221                .or_insert(block_number);
1222        }
1223
1224        let mut batch = self.batch();
1225        for (address, min_block) in address_min_block {
1226            match min_block.checked_sub(1) {
1227                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1228                None => batch.clear_account_history(address)?,
1229            }
1230        }
1231
1232        Ok(batch.into_inner())
1233    }
1234
1235    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1236    ///
1237    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1238    /// For each key, keeps only blocks less than the minimum block
1239    /// (i.e., removes the minimum block and all higher blocks).
1240    ///
1241    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1242    pub fn unwind_storage_history_indices(
1243        &self,
1244        storage_changesets: &[(Address, B256, BlockNumber)],
1245    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1246        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1247            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1248        for &(address, storage_key, block_number) in storage_changesets {
1249            key_min_block
1250                .entry((address, storage_key))
1251                .and_modify(|min| *min = (*min).min(block_number))
1252                .or_insert(block_number);
1253        }
1254
1255        let mut batch = self.batch();
1256        for ((address, storage_key), min_block) in key_min_block {
1257            match min_block.checked_sub(1) {
1258                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1259                None => batch.clear_storage_history(address, storage_key)?,
1260            }
1261        }
1262
1263        Ok(batch.into_inner())
1264    }
1265
1266    /// Writes a batch of operations atomically.
1267    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1268    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1269    where
1270        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1271    {
1272        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1273            let mut batch_handle = this.batch();
1274            f(&mut batch_handle)?;
1275            batch_handle.commit()
1276        })
1277    }
1278
1279    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1280    ///
1281    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1282    /// and needs to be committed at a later point (e.g., at provider commit time).
1283    ///
1284    /// # Panics
1285    /// Panics if the provider is in read-only mode.
1286    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1287    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1288        self.0.db_rw().write_opt(batch, &synced_write_options()).map_err(|e| {
1289            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1290                message: e.to_string().into(),
1291                code: -1,
1292            }))
1293        })
1294    }
1295
1296    /// Writes all `RocksDB` data for multiple blocks in parallel.
1297    ///
1298    /// This handles transaction hash numbers, account history, and storage history based on
1299    /// the provided storage settings. Each operation runs in parallel with its own batch,
1300    /// pushing to `ctx.pending_batches` for later commit.
1301    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1302    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1303        &self,
1304        blocks: &[ExecutedBlock<N>],
1305        tx_nums: &[TxNumber],
1306        ctx: RocksDBWriteCtx,
1307        runtime: &reth_tasks::Runtime,
1308    ) -> ProviderResult<()> {
1309        if !ctx.storage_settings.storage_v2 {
1310            return Ok(());
1311        }
1312
1313        let mut r_tx_hash = None;
1314        let mut r_account_history = None;
1315        let mut r_storage_history = None;
1316
1317        let write_tx_hash =
1318            ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1319        let write_account_history = ctx.storage_settings.storage_v2;
1320        let write_storage_history = ctx.storage_settings.storage_v2;
1321
1322        // Propagate tracing context into rayon-spawned threads so that RocksDB
1323        // write spans appear as children of write_blocks_data in traces.
1324        let span = tracing::Span::current();
1325        runtime.storage_pool().in_place_scope(|s| {
1326            if write_tx_hash {
1327                s.spawn(|_| {
1328                    let _guard = span.enter();
1329                    r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1330                });
1331            }
1332
1333            if write_account_history {
1334                s.spawn(|_| {
1335                    let _guard = span.enter();
1336                    r_account_history = Some(self.write_account_history(blocks, &ctx));
1337                });
1338            }
1339
1340            if write_storage_history {
1341                s.spawn(|_| {
1342                    let _guard = span.enter();
1343                    r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1344                });
1345            }
1346        });
1347
1348        if write_tx_hash {
1349            r_tx_hash.ok_or_else(|| {
1350                ProviderError::Database(DatabaseError::Other(
1351                    "rocksdb tx-hash write thread panicked".into(),
1352                ))
1353            })??;
1354        }
1355        if write_account_history {
1356            r_account_history.ok_or_else(|| {
1357                ProviderError::Database(DatabaseError::Other(
1358                    "rocksdb account-history write thread panicked".into(),
1359                ))
1360            })??;
1361        }
1362        if write_storage_history {
1363            r_storage_history.ok_or_else(|| {
1364                ProviderError::Database(DatabaseError::Other(
1365                    "rocksdb storage-history write thread panicked".into(),
1366                ))
1367            })??;
1368        }
1369
1370        Ok(())
1371    }
1372
1373    /// Writes transaction hash to number mappings for the given blocks.
1374    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1375    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1376        &self,
1377        blocks: &[ExecutedBlock<N>],
1378        tx_nums: &[TxNumber],
1379        ctx: &RocksDBWriteCtx,
1380    ) -> ProviderResult<()> {
1381        let mut batch = self.batch();
1382        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1383            let body = block.recovered_block().body();
1384            for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1385                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1386            }
1387        }
1388        ctx.pending_batches.lock().push(batch.into_inner());
1389        Ok(())
1390    }
1391
1392    /// Writes account history indices for the given blocks.
1393    ///
1394    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1395    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1396    fn write_account_history<N: reth_node_types::NodePrimitives>(
1397        &self,
1398        blocks: &[ExecutedBlock<N>],
1399        ctx: &RocksDBWriteCtx,
1400    ) -> ProviderResult<()> {
1401        let mut batch = self.batch();
1402        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1403
1404        for (block_idx, block) in blocks.iter().enumerate() {
1405            let block_number = ctx.first_block_number + block_idx as u64;
1406            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1407
1408            // Iterate through account reverts - these are exactly the accounts that have
1409            // changesets written, ensuring history indices match changeset entries.
1410            for account_block_reverts in reverts.accounts {
1411                for (address, _) in account_block_reverts {
1412                    account_history.entry(address).or_default().push(block_number);
1413                }
1414            }
1415        }
1416
1417        // Write account history using proper shard append logic
1418        for (address, indices) in account_history {
1419            batch.append_account_history_shard(address, indices)?;
1420        }
1421        ctx.pending_batches.lock().push(batch.into_inner());
1422        Ok(())
1423    }
1424
1425    /// Writes storage history indices for the given blocks.
1426    ///
1427    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1428    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1429    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1430        &self,
1431        blocks: &[ExecutedBlock<N>],
1432        ctx: &RocksDBWriteCtx,
1433    ) -> ProviderResult<()> {
1434        let mut batch = self.batch();
1435        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1436
1437        for (block_idx, block) in blocks.iter().enumerate() {
1438            let block_number = ctx.first_block_number + block_idx as u64;
1439            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1440
1441            // Iterate through storage reverts - these are exactly the slots that have
1442            // changesets written, ensuring history indices match changeset entries.
1443            for storage_block_reverts in reverts.storage {
1444                for revert in storage_block_reverts {
1445                    for (slot, _) in revert.storage_revert {
1446                        let plain_key = B256::new(slot.to_be_bytes());
1447                        storage_history
1448                            .entry((revert.address, plain_key))
1449                            .or_default()
1450                            .push(block_number);
1451                    }
1452                }
1453            }
1454        }
1455
1456        // Write storage history using proper shard append logic
1457        for ((address, slot), indices) in storage_history {
1458            batch.append_storage_history_shard(address, slot, indices)?;
1459        }
1460        ctx.pending_batches.lock().push(batch.into_inner());
1461        Ok(())
1462    }
1463}
1464
1465/// A point-in-time read snapshot of the `RocksDB` database.
1466///
1467/// All reads through this snapshot see a consistent view of the database at the point
1468/// the snapshot was created, regardless of concurrent writes. This is the primary reader
1469/// used by [`EitherReader::RocksDB`](crate::either_writer::EitherReader) for history lookups.
1470///
1471/// Lighter weight than [`RocksTx`] — no transaction overhead, no write support.
1472pub struct RocksReadSnapshot<'db> {
1473    inner: RocksReadSnapshotInner<'db>,
1474    provider: &'db RocksDBProvider,
1475}
1476
1477/// Inner enum to hold the snapshot for either read-write or secondary mode.
1478enum RocksReadSnapshotInner<'db> {
1479    /// Snapshot from read-write `OptimisticTransactionDB`.
1480    ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1481    /// Direct reads from a secondary `DB` instance (no snapshot).
1482    Secondary(&'db DB),
1483}
1484
1485impl<'db> RocksReadSnapshotInner<'db> {
1486    /// Returns a raw iterator over a column family.
1487    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1488        match self {
1489            Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1490            Self::Secondary(db) => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
1491        }
1492    }
1493}
1494
1495impl fmt::Debug for RocksReadSnapshot<'_> {
1496    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1497        f.debug_struct("RocksReadSnapshot")
1498            .field("provider", &self.provider)
1499            .finish_non_exhaustive()
1500    }
1501}
1502
1503impl<'db> RocksReadSnapshot<'db> {
1504    /// Gets the column family handle for a table.
1505    fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1506        self.provider.get_cf_handle::<T>()
1507    }
1508
1509    /// Gets a value from the specified table.
1510    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1511        let encoded_key = key.encode();
1512        let cf = self.cf_handle::<T>()?;
1513        let result = match &self.inner {
1514            RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1515            RocksReadSnapshotInner::Secondary(db) => db.get_cf(cf, encoded_key.as_ref()),
1516        }
1517        .map_err(|e| {
1518            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1519                message: e.to_string().into(),
1520                code: -1,
1521            }))
1522        })?;
1523
1524        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1525    }
1526
1527    /// Lookup account history and return [`HistoryInfo`] directly.
1528    ///
1529    /// `visible_tip` is the highest block considered visible from the companion MDBX snapshot.
1530    /// History entries above it are ignored even if they already exist in `RocksDB`.
1531    pub fn account_history_info(
1532        &self,
1533        address: Address,
1534        block_number: BlockNumber,
1535        lowest_available_block_number: Option<BlockNumber>,
1536        visible_tip: BlockNumber,
1537    ) -> ProviderResult<HistoryInfo> {
1538        let key = ShardedKey::new(address, block_number);
1539        self.history_info::<tables::AccountsHistory>(
1540            key.encode().as_ref(),
1541            block_number,
1542            lowest_available_block_number,
1543            visible_tip,
1544            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1545            |prev_bytes| {
1546                <ShardedKey<Address> as Decode>::decode(prev_bytes)
1547                    .map(|k| k.key == address)
1548                    .unwrap_or(false)
1549            },
1550        )
1551    }
1552
1553    /// Lookup storage history and return [`HistoryInfo`] directly.
1554    ///
1555    /// `visible_tip` is the highest block considered visible from the companion MDBX snapshot.
1556    /// History entries above it are ignored even if they already exist in `RocksDB`.
1557    pub fn storage_history_info(
1558        &self,
1559        address: Address,
1560        storage_key: B256,
1561        block_number: BlockNumber,
1562        lowest_available_block_number: Option<BlockNumber>,
1563        visible_tip: BlockNumber,
1564    ) -> ProviderResult<HistoryInfo> {
1565        let key = StorageShardedKey::new(address, storage_key, block_number);
1566        self.history_info::<tables::StoragesHistory>(
1567            key.encode().as_ref(),
1568            block_number,
1569            lowest_available_block_number,
1570            visible_tip,
1571            |key_bytes| {
1572                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1573                Ok(k.address == address && k.sharded_key.key == storage_key)
1574            },
1575            |prev_bytes| {
1576                <StorageShardedKey as Decode>::decode(prev_bytes)
1577                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
1578                    .unwrap_or(false)
1579            },
1580        )
1581    }
1582
1583    /// Generic history lookup using the snapshot's raw iterator.
1584    ///
1585    /// The result is derived from the history that is visible through `visible_tip`, not from the
1586    /// full contents of `RocksDB`. This lets a reader combine an older MDBX snapshot with a newer
1587    /// Rocks snapshot without routing through history entries that MDBX cannot see yet.
1588    fn history_info<T>(
1589        &self,
1590        encoded_key: &[u8],
1591        block_number: BlockNumber,
1592        lowest_available_block_number: Option<BlockNumber>,
1593        visible_tip: BlockNumber,
1594        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1595        prev_key_matches: impl Fn(&[u8]) -> bool,
1596    ) -> ProviderResult<HistoryInfo>
1597    where
1598        T: Table<Value = BlockNumberList>,
1599    {
1600        let is_maybe_pruned = lowest_available_block_number.is_some();
1601        let fallback = || {
1602            Ok(if is_maybe_pruned {
1603                HistoryInfo::MaybeInPlainState
1604            } else {
1605                HistoryInfo::NotYetWritten
1606            })
1607        };
1608
1609        let cf = self.cf_handle::<T>()?;
1610        let mut iter = self.inner.raw_iterator_cf(cf);
1611
1612        iter.seek(encoded_key);
1613        iter.status().map_err(|e| {
1614            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1615                message: e.to_string().into(),
1616                code: -1,
1617            }))
1618        })?;
1619
1620        if !iter.valid() {
1621            return fallback();
1622        }
1623
1624        let Some(key_bytes) = iter.key() else {
1625            return fallback();
1626        };
1627        if !key_matches(key_bytes)? {
1628            return fallback();
1629        }
1630
1631        let Some(value_bytes) = iter.value() else {
1632            return fallback();
1633        };
1634        let chunk = BlockNumberList::decompress(value_bytes)?;
1635
1636        let (rank, found_block) = compute_history_rank(&chunk, block_number);
1637        // Ignore later Rocks history that is ahead of the companion MDBX snapshot.
1638        let found_block = found_block.filter(|block| *block <= visible_tip);
1639
1640        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1641            iter.prev();
1642            iter.status().map_err(|e| {
1643                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1644                    message: e.to_string().into(),
1645                    code: -1,
1646                }))
1647            })?;
1648            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1649
1650            // If the current shard only contains history above `visible_tip`, there is no usable
1651            // later change. Without a previous shard for the same key, fall back to the existing
1652            // not-written / maybe-pruned result instead of routing into plain state.
1653            if found_block.is_none() && !has_prev {
1654                return fallback()
1655            }
1656
1657            !has_prev
1658        } else {
1659            false
1660        };
1661
1662        Ok(HistoryInfo::from_lookup(
1663            found_block,
1664            is_before_first_write,
1665            lowest_available_block_number,
1666        ))
1667    }
1668}
1669
1670/// Outcome of pruning a history shard in `RocksDB`.
1671#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1672pub enum PruneShardOutcome {
1673    /// Shard was deleted entirely.
1674    Deleted,
1675    /// Shard was updated with filtered block numbers.
1676    Updated,
1677    /// Shard was unchanged (no blocks <= `to_block`).
1678    Unchanged,
1679}
1680
1681/// Tracks pruning outcomes for batch operations.
1682#[derive(Debug, Default, Clone, Copy)]
1683pub struct PrunedIndices {
1684    /// Number of shards completely deleted.
1685    pub deleted: usize,
1686    /// Number of shards that were updated (filtered but still have entries).
1687    pub updated: usize,
1688    /// Number of shards that were unchanged.
1689    pub unchanged: usize,
1690}
1691
1692/// Handle for building a batch of operations atomically.
1693///
1694/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1695/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1696/// where you don't need to read back uncommitted data within the same operation
1697/// (e.g., history index writes).
1698///
1699/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1700/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1701#[must_use = "batch must be committed"]
1702pub struct RocksDBBatch<'a> {
1703    provider: &'a RocksDBProvider,
1704    inner: WriteBatchWithTransaction<true>,
1705    buf: Vec<u8>,
1706    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1707    auto_commit_threshold: Option<usize>,
1708}
1709
1710impl fmt::Debug for RocksDBBatch<'_> {
1711    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1712        f.debug_struct("RocksDBBatch")
1713            .field("provider", &self.provider)
1714            .field("batch", &"<WriteBatchWithTransaction>")
1715            // Number of operations in this batch
1716            .field("length", &self.inner.len())
1717            // Total serialized size (encoded key + compressed value + metadata) of this batch
1718            // in bytes
1719            .field("size_in_bytes", &self.inner.size_in_bytes())
1720            .finish()
1721    }
1722}
1723
1724impl<'a> RocksDBBatch<'a> {
1725    /// Puts a value into the batch.
1726    ///
1727    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1728    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1729        let encoded_key = key.encode();
1730        self.put_encoded::<T>(&encoded_key, value)
1731    }
1732
1733    /// Puts a value into the batch using pre-encoded key.
1734    ///
1735    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1736    pub fn put_encoded<T: Table>(
1737        &mut self,
1738        key: &<T::Key as Encode>::Encoded,
1739        value: &T::Value,
1740    ) -> ProviderResult<()> {
1741        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1742        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1743        self.maybe_auto_commit()?;
1744        Ok(())
1745    }
1746
1747    /// Deletes a value from the batch.
1748    ///
1749    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1750    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1751        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1752        self.maybe_auto_commit()?;
1753        Ok(())
1754    }
1755
1756    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1757    ///
1758    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1759    /// Returns immediately if auto-commit is disabled or threshold not reached.
1760    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1761        if let Some(threshold) = self.auto_commit_threshold &&
1762            self.inner.size_in_bytes() >= threshold
1763        {
1764            tracing::debug!(
1765                target: "providers::rocksdb",
1766                batch_size = self.inner.size_in_bytes(),
1767                threshold,
1768                "Auto-committing RocksDB batch"
1769            );
1770            let old_batch = std::mem::take(&mut self.inner);
1771            self.provider.0.db_rw().write_opt(old_batch, &synced_write_options()).map_err(|e| {
1772                ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1773                    message: e.to_string().into(),
1774                    code: -1,
1775                }))
1776            })?;
1777        }
1778        Ok(())
1779    }
1780
1781    /// Commits the batch to the database.
1782    ///
1783    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1784    ///
1785    /// # Panics
1786    /// Panics if the provider is in read-only mode.
1787    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1788    pub fn commit(self) -> ProviderResult<()> {
1789        self.provider.0.db_rw().write_opt(self.inner, &synced_write_options()).map_err(|e| {
1790            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1791                message: e.to_string().into(),
1792                code: -1,
1793            }))
1794        })
1795    }
1796
1797    /// Returns the number of write operations (puts + deletes) queued in this batch.
1798    pub fn len(&self) -> usize {
1799        self.inner.len()
1800    }
1801
1802    /// Returns `true` if the batch contains no operations.
1803    pub fn is_empty(&self) -> bool {
1804        self.inner.is_empty()
1805    }
1806
1807    /// Returns the size of the batch in bytes.
1808    pub fn size_in_bytes(&self) -> usize {
1809        self.inner.size_in_bytes()
1810    }
1811
1812    /// Returns a reference to the underlying `RocksDB` provider.
1813    pub const fn provider(&self) -> &RocksDBProvider {
1814        self.provider
1815    }
1816
1817    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1818    ///
1819    /// This is used to defer commits to the provider level.
1820    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1821        self.inner
1822    }
1823
1824    /// Gets a value from the database.
1825    ///
1826    /// **Important constraint:** This reads only committed state, not pending writes in this
1827    /// batch or other pending batches in `pending_rocksdb_batches`.
1828    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1829        self.provider.get::<T>(key)
1830    }
1831
1832    /// Appends indices to an account history shard with proper shard management.
1833    ///
1834    /// Loads the existing shard (if any), appends new indices, and rechunks into
1835    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1836    ///
1837    /// # Requirements
1838    ///
1839    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1840    /// - This method MUST only be called once per address per batch. The batch reads existing
1841    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1842    ///   address will cause the second call to overwrite the first.
1843    pub fn append_account_history_shard(
1844        &mut self,
1845        address: Address,
1846        indices: impl IntoIterator<Item = u64>,
1847    ) -> ProviderResult<()> {
1848        let indices: Vec<u64> = indices.into_iter().collect();
1849
1850        if indices.is_empty() {
1851            return Ok(());
1852        }
1853
1854        debug_assert!(
1855            indices.windows(2).all(|w| w[0] < w[1]),
1856            "indices must be strictly increasing: {:?}",
1857            indices
1858        );
1859
1860        let last_key = ShardedKey::new(address, u64::MAX);
1861        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1862        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1863
1864        last_shard.append(indices).map_err(ProviderError::other)?;
1865
1866        // Fast path: all indices fit in one shard
1867        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1868            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1869            return Ok(());
1870        }
1871
1872        // Slow path: rechunk into multiple shards
1873        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1874        let mut chunks_peekable = chunks.into_iter().peekable();
1875
1876        while let Some(chunk) = chunks_peekable.next() {
1877            let shard = BlockNumberList::new_pre_sorted(chunk);
1878            let highest_block_number = if chunks_peekable.peek().is_some() {
1879                shard.iter().next_back().expect("`chunks` does not return empty list")
1880            } else {
1881                u64::MAX
1882            };
1883
1884            self.put::<tables::AccountsHistory>(
1885                ShardedKey::new(address, highest_block_number),
1886                &shard,
1887            )?;
1888        }
1889
1890        Ok(())
1891    }
1892
1893    /// Appends indices to a storage history shard with proper shard management.
1894    ///
1895    /// Loads the existing shard (if any), appends new indices, and rechunks into
1896    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1897    ///
1898    /// # Requirements
1899    ///
1900    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1901    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1902    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1903    ///   twice for the same key will cause the second call to overwrite the first.
1904    pub fn append_storage_history_shard(
1905        &mut self,
1906        address: Address,
1907        storage_key: B256,
1908        indices: impl IntoIterator<Item = u64>,
1909    ) -> ProviderResult<()> {
1910        let indices: Vec<u64> = indices.into_iter().collect();
1911
1912        if indices.is_empty() {
1913            return Ok(());
1914        }
1915
1916        debug_assert!(
1917            indices.windows(2).all(|w| w[0] < w[1]),
1918            "indices must be strictly increasing: {:?}",
1919            indices
1920        );
1921
1922        let last_key = StorageShardedKey::last(address, storage_key);
1923        let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1924        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1925
1926        last_shard.append(indices).map_err(ProviderError::other)?;
1927
1928        // Fast path: all indices fit in one shard
1929        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1930            self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1931            return Ok(());
1932        }
1933
1934        // Slow path: rechunk into multiple shards
1935        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1936        let mut chunks_peekable = chunks.into_iter().peekable();
1937
1938        while let Some(chunk) = chunks_peekable.next() {
1939            let shard = BlockNumberList::new_pre_sorted(chunk);
1940            let highest_block_number = if chunks_peekable.peek().is_some() {
1941                shard.iter().next_back().expect("`chunks` does not return empty list")
1942            } else {
1943                u64::MAX
1944            };
1945
1946            self.put::<tables::StoragesHistory>(
1947                StorageShardedKey::new(address, storage_key, highest_block_number),
1948                &shard,
1949            )?;
1950        }
1951
1952        Ok(())
1953    }
1954
1955    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1956    ///
1957    /// Mirrors MDBX `unwind_history_shards` behavior:
1958    /// - Deletes shards entirely above `keep_to`
1959    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1960    /// - Preserves shards entirely below `keep_to`
1961    pub fn unwind_account_history_to(
1962        &mut self,
1963        address: Address,
1964        keep_to: BlockNumber,
1965    ) -> ProviderResult<()> {
1966        let shards = self.provider.account_history_shards(address)?;
1967        if shards.is_empty() {
1968            return Ok(());
1969        }
1970
1971        // Find the first shard that might contain blocks > keep_to.
1972        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1973        let boundary_idx = shards.iter().position(|(key, _)| {
1974            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1975        });
1976
1977        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1978        let Some(boundary_idx) = boundary_idx else {
1979            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1980            if last_key.highest_block_number != u64::MAX {
1981                self.delete::<tables::AccountsHistory>(last_key.clone())?;
1982                self.put::<tables::AccountsHistory>(
1983                    ShardedKey::new(address, u64::MAX),
1984                    last_value,
1985                )?;
1986            }
1987            return Ok(());
1988        };
1989
1990        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1991        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1992            self.delete::<tables::AccountsHistory>(key.clone())?;
1993        }
1994
1995        // Process the boundary shard: filter out blocks > keep_to
1996        let (boundary_key, boundary_list) = &shards[boundary_idx];
1997
1998        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1999        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
2000
2001        // Build truncated list once; check emptiness directly (avoids double iteration)
2002        let new_last =
2003            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2004
2005        if new_last.is_empty() {
2006            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2007            // u64::MAX.
2008            if boundary_idx == 0 {
2009                // Nothing left for this address
2010                return Ok(());
2011            }
2012
2013            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2014            if prev_key.highest_block_number != u64::MAX {
2015                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
2016                self.put::<tables::AccountsHistory>(
2017                    ShardedKey::new(address, u64::MAX),
2018                    prev_value,
2019                )?;
2020            }
2021            return Ok(());
2022        }
2023
2024        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
2025
2026        Ok(())
2027    }
2028
2029    /// Prunes history shards, removing blocks <= `to_block`.
2030    ///
2031    /// Generic implementation for both account and storage history pruning.
2032    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2033    /// (if any) will have the sentinel key (`u64::MAX`).
2034    #[expect(clippy::too_many_arguments)]
2035    fn prune_history_shards_inner<K>(
2036        &mut self,
2037        shards: Vec<(K, BlockNumberList)>,
2038        to_block: BlockNumber,
2039        get_highest: impl Fn(&K) -> u64,
2040        is_sentinel: impl Fn(&K) -> bool,
2041        delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
2042        put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
2043        create_sentinel: impl Fn() -> K,
2044    ) -> ProviderResult<PruneShardOutcome>
2045    where
2046        K: Clone,
2047    {
2048        if shards.is_empty() {
2049            return Ok(PruneShardOutcome::Unchanged);
2050        }
2051
2052        let mut deleted = false;
2053        let mut updated = false;
2054        let mut last_remaining: Option<(K, BlockNumberList)> = None;
2055
2056        for (key, block_list) in shards {
2057            if !is_sentinel(&key) && get_highest(&key) <= to_block {
2058                delete_shard(self, key)?;
2059                deleted = true;
2060            } else {
2061                let original_len = block_list.len();
2062                let filtered =
2063                    BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
2064
2065                if filtered.is_empty() {
2066                    delete_shard(self, key)?;
2067                    deleted = true;
2068                } else if filtered.len() < original_len {
2069                    put_shard(self, key.clone(), &filtered)?;
2070                    last_remaining = Some((key, filtered));
2071                    updated = true;
2072                } else {
2073                    last_remaining = Some((key, block_list));
2074                }
2075            }
2076        }
2077
2078        if let Some((last_key, last_value)) = last_remaining &&
2079            !is_sentinel(&last_key)
2080        {
2081            delete_shard(self, last_key)?;
2082            put_shard(self, create_sentinel(), &last_value)?;
2083            updated = true;
2084        }
2085
2086        if deleted {
2087            Ok(PruneShardOutcome::Deleted)
2088        } else if updated {
2089            Ok(PruneShardOutcome::Updated)
2090        } else {
2091            Ok(PruneShardOutcome::Unchanged)
2092        }
2093    }
2094
2095    /// Prunes account history for the given address, removing blocks <= `to_block`.
2096    ///
2097    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2098    /// (if any) will have the sentinel key (`u64::MAX`).
2099    pub fn prune_account_history_to(
2100        &mut self,
2101        address: Address,
2102        to_block: BlockNumber,
2103    ) -> ProviderResult<PruneShardOutcome> {
2104        let shards = self.provider.account_history_shards(address)?;
2105        self.prune_history_shards_inner(
2106            shards,
2107            to_block,
2108            |key| key.highest_block_number,
2109            |key| key.highest_block_number == u64::MAX,
2110            |batch, key| batch.delete::<tables::AccountsHistory>(key),
2111            |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2112            || ShardedKey::new(address, u64::MAX),
2113        )
2114    }
2115
2116    /// Prunes account history for multiple addresses in a single iterator pass.
2117    ///
2118    /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
2119    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2120    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2121    ///
2122    /// `targets` MUST be sorted by address for correctness and optimal performance
2123    /// (matches on-disk key order).
2124    pub fn prune_account_history_batch(
2125        &mut self,
2126        targets: &[(Address, BlockNumber)],
2127    ) -> ProviderResult<PrunedIndices> {
2128        if targets.is_empty() {
2129            return Ok(PrunedIndices::default());
2130        }
2131
2132        debug_assert!(
2133            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2134            "prune_account_history_batch: targets must be sorted by address"
2135        );
2136
2137        // ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
2138        // The first 20 bytes are the "prefix" that identifies the address
2139        const PREFIX_LEN: usize = 20;
2140
2141        let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2142        let mut iter = self.provider.0.raw_iterator_cf(cf);
2143        let mut outcomes = PrunedIndices::default();
2144
2145        for (address, to_block) in targets {
2146            // Build the target prefix (first 20 bytes = address)
2147            let start_key = ShardedKey::new(*address, 0u64).encode();
2148            let target_prefix = &start_key[..PREFIX_LEN];
2149
2150            // Check if we need to seek or if the iterator is already positioned correctly.
2151            // After processing the previous target, the iterator is either:
2152            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2153            // 2. Invalid (no more keys)
2154            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2155            let needs_seek = if iter.valid() {
2156                if let Some(current_key) = iter.key() {
2157                    // If current key's prefix < target prefix, we need to seek forward
2158                    // If current key's prefix > target prefix, this target has no shards (skip)
2159                    // If current key's prefix == target prefix, we're already positioned
2160                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2161                } else {
2162                    true
2163                }
2164            } else {
2165                true
2166            };
2167
2168            if needs_seek {
2169                iter.seek(start_key);
2170                iter.status().map_err(|e| {
2171                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2172                        message: e.to_string().into(),
2173                        code: -1,
2174                    }))
2175                })?;
2176            }
2177
2178            // Collect all shards for this address using raw prefix comparison
2179            let mut shards = Vec::new();
2180            while iter.valid() {
2181                let Some(key_bytes) = iter.key() else { break };
2182
2183                // Use raw prefix comparison instead of full decode for the prefix check
2184                let current_prefix = key_bytes.get(..PREFIX_LEN);
2185                if current_prefix != Some(target_prefix) {
2186                    break;
2187                }
2188
2189                // Now decode the full key (we need the block number)
2190                let key = ShardedKey::<Address>::decode(key_bytes)
2191                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2192
2193                let Some(value_bytes) = iter.value() else { break };
2194                let value = BlockNumberList::decompress(value_bytes)
2195                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2196
2197                shards.push((key, value));
2198                iter.next();
2199            }
2200
2201            match self.prune_history_shards_inner(
2202                shards,
2203                *to_block,
2204                |key| key.highest_block_number,
2205                |key| key.highest_block_number == u64::MAX,
2206                |batch, key| batch.delete::<tables::AccountsHistory>(key),
2207                |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2208                || ShardedKey::new(*address, u64::MAX),
2209            )? {
2210                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2211                PruneShardOutcome::Updated => outcomes.updated += 1,
2212                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2213            }
2214        }
2215
2216        Ok(outcomes)
2217    }
2218
2219    /// Prunes storage history for the given address and storage key, removing blocks <=
2220    /// `to_block`.
2221    ///
2222    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2223    /// (if any) will have the sentinel key (`u64::MAX`).
2224    pub fn prune_storage_history_to(
2225        &mut self,
2226        address: Address,
2227        storage_key: B256,
2228        to_block: BlockNumber,
2229    ) -> ProviderResult<PruneShardOutcome> {
2230        let shards = self.provider.storage_history_shards(address, storage_key)?;
2231        self.prune_history_shards_inner(
2232            shards,
2233            to_block,
2234            |key| key.sharded_key.highest_block_number,
2235            |key| key.sharded_key.highest_block_number == u64::MAX,
2236            |batch, key| batch.delete::<tables::StoragesHistory>(key),
2237            |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2238            || StorageShardedKey::last(address, storage_key),
2239        )
2240    }
2241
2242    /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
2243    /// pass.
2244    ///
2245    /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
2246    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2247    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2248    ///
2249    /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
2250    /// performance (matches on-disk key order).
2251    pub fn prune_storage_history_batch(
2252        &mut self,
2253        targets: &[((Address, B256), BlockNumber)],
2254    ) -> ProviderResult<PrunedIndices> {
2255        if targets.is_empty() {
2256            return Ok(PrunedIndices::default());
2257        }
2258
2259        debug_assert!(
2260            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2261            "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2262        );
2263
2264        // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
2265        // The first 52 bytes are the "prefix" that identifies (address, storage_key)
2266        const PREFIX_LEN: usize = 52;
2267
2268        let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2269        let mut iter = self.provider.0.raw_iterator_cf(cf);
2270        let mut outcomes = PrunedIndices::default();
2271
2272        for ((address, storage_key), to_block) in targets {
2273            // Build the target prefix (first 52 bytes of encoded key)
2274            let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2275            let target_prefix = &start_key[..PREFIX_LEN];
2276
2277            // Check if we need to seek or if the iterator is already positioned correctly.
2278            // After processing the previous target, the iterator is either:
2279            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2280            // 2. Invalid (no more keys)
2281            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2282            let needs_seek = if iter.valid() {
2283                if let Some(current_key) = iter.key() {
2284                    // If current key's prefix < target prefix, we need to seek forward
2285                    // If current key's prefix > target prefix, this target has no shards (skip)
2286                    // If current key's prefix == target prefix, we're already positioned
2287                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2288                } else {
2289                    true
2290                }
2291            } else {
2292                true
2293            };
2294
2295            if needs_seek {
2296                iter.seek(start_key);
2297                iter.status().map_err(|e| {
2298                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2299                        message: e.to_string().into(),
2300                        code: -1,
2301                    }))
2302                })?;
2303            }
2304
2305            // Collect all shards for this (address, storage_key) pair using prefix comparison
2306            let mut shards = Vec::new();
2307            while iter.valid() {
2308                let Some(key_bytes) = iter.key() else { break };
2309
2310                // Use raw prefix comparison instead of full decode for the prefix check
2311                let current_prefix = key_bytes.get(..PREFIX_LEN);
2312                if current_prefix != Some(target_prefix) {
2313                    break;
2314                }
2315
2316                // Now decode the full key (we need the block number)
2317                let key = StorageShardedKey::decode(key_bytes)
2318                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2319
2320                let Some(value_bytes) = iter.value() else { break };
2321                let value = BlockNumberList::decompress(value_bytes)
2322                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2323
2324                shards.push((key, value));
2325                iter.next();
2326            }
2327
2328            // Use existing prune_history_shards_inner logic
2329            match self.prune_history_shards_inner(
2330                shards,
2331                *to_block,
2332                |key| key.sharded_key.highest_block_number,
2333                |key| key.sharded_key.highest_block_number == u64::MAX,
2334                |batch, key| batch.delete::<tables::StoragesHistory>(key),
2335                |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2336                || StorageShardedKey::last(*address, *storage_key),
2337            )? {
2338                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2339                PruneShardOutcome::Updated => outcomes.updated += 1,
2340                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2341            }
2342        }
2343
2344        Ok(outcomes)
2345    }
2346
2347    /// Unwinds storage history to keep only blocks `<= keep_to`.
2348    ///
2349    /// Handles multi-shard scenarios by:
2350    /// 1. Loading all shards for the `(address, storage_key)` pair
2351    /// 2. Finding the boundary shard containing `keep_to`
2352    /// 3. Deleting all shards after the boundary
2353    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
2354    /// 5. Ensuring the last shard is keyed with `u64::MAX`
2355    pub fn unwind_storage_history_to(
2356        &mut self,
2357        address: Address,
2358        storage_key: B256,
2359        keep_to: BlockNumber,
2360    ) -> ProviderResult<()> {
2361        let shards = self.provider.storage_history_shards(address, storage_key)?;
2362        if shards.is_empty() {
2363            return Ok(());
2364        }
2365
2366        // Find the first shard that might contain blocks > keep_to.
2367        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
2368        let boundary_idx = shards.iter().position(|(key, _)| {
2369            key.sharded_key.highest_block_number == u64::MAX ||
2370                key.sharded_key.highest_block_number > keep_to
2371        });
2372
2373        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2374        let Some(boundary_idx) = boundary_idx else {
2375            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2376            if last_key.sharded_key.highest_block_number != u64::MAX {
2377                self.delete::<tables::StoragesHistory>(last_key.clone())?;
2378                self.put::<tables::StoragesHistory>(
2379                    StorageShardedKey::last(address, storage_key),
2380                    last_value,
2381                )?;
2382            }
2383            return Ok(());
2384        };
2385
2386        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2387        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2388            self.delete::<tables::StoragesHistory>(key.clone())?;
2389        }
2390
2391        // Process the boundary shard: filter out blocks > keep_to
2392        let (boundary_key, boundary_list) = &shards[boundary_idx];
2393
2394        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2395        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2396
2397        // Build truncated list once; check emptiness directly (avoids double iteration)
2398        let new_last =
2399            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2400
2401        if new_last.is_empty() {
2402            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2403            // u64::MAX.
2404            if boundary_idx == 0 {
2405                // Nothing left for this (address, storage_key) pair
2406                return Ok(());
2407            }
2408
2409            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2410            if prev_key.sharded_key.highest_block_number != u64::MAX {
2411                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2412                self.put::<tables::StoragesHistory>(
2413                    StorageShardedKey::last(address, storage_key),
2414                    prev_value,
2415                )?;
2416            }
2417            return Ok(());
2418        }
2419
2420        self.put::<tables::StoragesHistory>(
2421            StorageShardedKey::last(address, storage_key),
2422            &new_last,
2423        )?;
2424
2425        Ok(())
2426    }
2427
2428    /// Clears all account history shards for the given address.
2429    ///
2430    /// Used when unwinding from block 0 (i.e., removing all history).
2431    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2432        let shards = self.provider.account_history_shards(address)?;
2433        for (key, _) in shards {
2434            self.delete::<tables::AccountsHistory>(key)?;
2435        }
2436        Ok(())
2437    }
2438
2439    /// Clears all storage history shards for the given `(address, storage_key)` pair.
2440    ///
2441    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
2442    pub fn clear_storage_history(
2443        &mut self,
2444        address: Address,
2445        storage_key: B256,
2446    ) -> ProviderResult<()> {
2447        let shards = self.provider.storage_history_shards(address, storage_key)?;
2448        for (key, _) in shards {
2449            self.delete::<tables::StoragesHistory>(key)?;
2450        }
2451        Ok(())
2452    }
2453}
2454
2455/// `RocksDB` transaction wrapper providing MDBX-like semantics.
2456///
2457/// Supports:
2458/// - Read-your-writes: reads see uncommitted writes within the same transaction
2459/// - Atomic commit/rollback
2460/// - Iteration over uncommitted data
2461///
2462/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
2463/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
2464pub struct RocksTx<'db> {
2465    inner: Transaction<'db, OptimisticTransactionDB>,
2466    provider: &'db RocksDBProvider,
2467}
2468
2469impl fmt::Debug for RocksTx<'_> {
2470    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2471        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2472    }
2473}
2474
2475impl<'db> RocksTx<'db> {
2476    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
2477    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2478        let encoded_key = key.encode();
2479        self.get_encoded::<T>(&encoded_key)
2480    }
2481
2482    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
2483    pub fn get_encoded<T: Table>(
2484        &self,
2485        key: &<T::Key as Encode>::Encoded,
2486    ) -> ProviderResult<Option<T::Value>> {
2487        let cf = self.provider.get_cf_handle::<T>()?;
2488        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2489            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2490                message: e.to_string().into(),
2491                code: -1,
2492            }))
2493        })?;
2494
2495        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2496    }
2497
2498    /// Puts a value into the specified table.
2499    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2500        let encoded_key = key.encode();
2501        self.put_encoded::<T>(&encoded_key, value)
2502    }
2503
2504    /// Puts a value using pre-encoded key.
2505    pub fn put_encoded<T: Table>(
2506        &self,
2507        key: &<T::Key as Encode>::Encoded,
2508        value: &T::Value,
2509    ) -> ProviderResult<()> {
2510        let cf = self.provider.get_cf_handle::<T>()?;
2511        let mut buf = Vec::new();
2512        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2513
2514        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2515            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2516                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2517                operation: DatabaseWriteOperation::PutUpsert,
2518                table_name: T::NAME,
2519                key: key.as_ref().to_vec(),
2520            })))
2521        })
2522    }
2523
2524    /// Deletes a value from the specified table.
2525    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2526        let cf = self.provider.get_cf_handle::<T>()?;
2527        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2528            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2529                message: e.to_string().into(),
2530                code: -1,
2531            }))
2532        })
2533    }
2534
2535    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
2536    ///
2537    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
2538    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2539        let cf = self.provider.get_cf_handle::<T>()?;
2540        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2541        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2542    }
2543
2544    /// Creates an iterator starting from the given key (inclusive).
2545    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2546        let cf = self.provider.get_cf_handle::<T>()?;
2547        let encoded_key = key.encode();
2548        let iter = self
2549            .inner
2550            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2551        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2552    }
2553
2554    /// Commits the transaction, persisting all changes.
2555    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2556    pub fn commit(self) -> ProviderResult<()> {
2557        self.inner.commit().map_err(|e| {
2558            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2559                message: e.to_string().into(),
2560                code: -1,
2561            }))
2562        })
2563    }
2564
2565    /// Rolls back the transaction, discarding all changes.
2566    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2567    pub fn rollback(self) -> ProviderResult<()> {
2568        self.inner.rollback().map_err(|e| {
2569            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2570        })
2571    }
2572}
2573
2574/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
2575enum RocksDBIterEnum<'db> {
2576    /// Iterator from read-write `OptimisticTransactionDB`.
2577    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2578    /// Iterator from read-only `DB`.
2579    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2580}
2581
2582impl Iterator for RocksDBIterEnum<'_> {
2583    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2584
2585    fn next(&mut self) -> Option<Self::Item> {
2586        match self {
2587            Self::ReadWrite(iter) => iter.next(),
2588            Self::ReadOnly(iter) => iter.next(),
2589        }
2590    }
2591}
2592
2593/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
2594///
2595/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
2596/// without reinitializing the iterator.
2597enum RocksDBRawIterEnum<'db> {
2598    /// Raw iterator from read-write `OptimisticTransactionDB`.
2599    ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2600    /// Raw iterator from read-only `DB`.
2601    ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2602}
2603
2604impl RocksDBRawIterEnum<'_> {
2605    /// Positions the iterator at the first key >= `key`.
2606    fn seek(&mut self, key: impl AsRef<[u8]>) {
2607        match self {
2608            Self::ReadWrite(iter) => iter.seek(key),
2609            Self::ReadOnly(iter) => iter.seek(key),
2610        }
2611    }
2612
2613    /// Returns true if the iterator is positioned at a valid key-value pair.
2614    fn valid(&self) -> bool {
2615        match self {
2616            Self::ReadWrite(iter) => iter.valid(),
2617            Self::ReadOnly(iter) => iter.valid(),
2618        }
2619    }
2620
2621    /// Returns the current key, if valid.
2622    fn key(&self) -> Option<&[u8]> {
2623        match self {
2624            Self::ReadWrite(iter) => iter.key(),
2625            Self::ReadOnly(iter) => iter.key(),
2626        }
2627    }
2628
2629    /// Returns the current value, if valid.
2630    fn value(&self) -> Option<&[u8]> {
2631        match self {
2632            Self::ReadWrite(iter) => iter.value(),
2633            Self::ReadOnly(iter) => iter.value(),
2634        }
2635    }
2636
2637    /// Advances the iterator to the next key.
2638    fn next(&mut self) {
2639        match self {
2640            Self::ReadWrite(iter) => iter.next(),
2641            Self::ReadOnly(iter) => iter.next(),
2642        }
2643    }
2644
2645    /// Moves the iterator to the previous key.
2646    fn prev(&mut self) {
2647        match self {
2648            Self::ReadWrite(iter) => iter.prev(),
2649            Self::ReadOnly(iter) => iter.prev(),
2650        }
2651    }
2652
2653    /// Returns the status of the iterator.
2654    fn status(&self) -> Result<(), rocksdb::Error> {
2655        match self {
2656            Self::ReadWrite(iter) => iter.status(),
2657            Self::ReadOnly(iter) => iter.status(),
2658        }
2659    }
2660}
2661
2662/// Iterator over a `RocksDB` table (non-transactional).
2663///
2664/// Yields decoded `(Key, Value)` pairs in key order.
2665pub struct RocksDBIter<'db, T: Table> {
2666    inner: RocksDBIterEnum<'db>,
2667    _marker: std::marker::PhantomData<T>,
2668}
2669
2670impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2671    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2672        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2673    }
2674}
2675
2676impl<T: Table> Iterator for RocksDBIter<'_, T> {
2677    type Item = ProviderResult<(T::Key, T::Value)>;
2678
2679    fn next(&mut self) -> Option<Self::Item> {
2680        Some(decode_iter_item::<T>(self.inner.next()?))
2681    }
2682}
2683
2684/// Raw iterator over a `RocksDB` table (non-transactional).
2685///
2686/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2687pub struct RocksDBRawIter<'db> {
2688    inner: RocksDBIterEnum<'db>,
2689}
2690
2691impl fmt::Debug for RocksDBRawIter<'_> {
2692    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2693        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2694    }
2695}
2696
2697impl Iterator for RocksDBRawIter<'_> {
2698    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2699
2700    fn next(&mut self) -> Option<Self::Item> {
2701        match self.inner.next()? {
2702            Ok(kv) => Some(Ok(kv)),
2703            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2704                message: e.to_string().into(),
2705                code: -1,
2706            })))),
2707        }
2708    }
2709}
2710
2711/// Iterator over a `RocksDB` table within a transaction.
2712///
2713/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2714pub struct RocksTxIter<'tx, T: Table> {
2715    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2716    _marker: std::marker::PhantomData<T>,
2717}
2718
2719impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2721        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2722    }
2723}
2724
2725impl<T: Table> Iterator for RocksTxIter<'_, T> {
2726    type Item = ProviderResult<(T::Key, T::Value)>;
2727
2728    fn next(&mut self) -> Option<Self::Item> {
2729        Some(decode_iter_item::<T>(self.inner.next()?))
2730    }
2731}
2732
2733/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
2734///
2735/// Handles both error propagation from the underlying iterator and
2736/// decoding/decompression of the key and value bytes.
2737fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2738    let (key_bytes, value_bytes) = result.map_err(|e| {
2739        ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2740            message: e.to_string().into(),
2741            code: -1,
2742        }))
2743    })?;
2744
2745    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2746        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2747
2748    let value = T::Value::decompress(&value_bytes)
2749        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2750
2751    Ok((key, value))
2752}
2753
2754/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2755const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2756    match level {
2757        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2758        LogLevel::Error => rocksdb::LogLevel::Error,
2759        LogLevel::Warn => rocksdb::LogLevel::Warn,
2760        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2761        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2762    }
2763}
2764
2765#[cfg(test)]
2766mod tests {
2767    use super::*;
2768    use crate::providers::HistoryInfo;
2769    use alloy_primitives::{Address, TxHash, B256};
2770    use reth_db_api::{
2771        models::{
2772            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2773            storage_sharded_key::StorageShardedKey,
2774            IntegerList,
2775        },
2776        table::Table,
2777        tables,
2778    };
2779    use tempfile::TempDir;
2780
2781    #[test]
2782    fn test_with_default_tables_registers_required_column_families() {
2783        let temp_dir = TempDir::new().unwrap();
2784
2785        // Build with default tables
2786        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2787
2788        // Should be able to write/read TransactionHashNumbers
2789        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2790        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2791        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2792
2793        // Should be able to write/read AccountsHistory
2794        let key = ShardedKey::new(Address::ZERO, 100);
2795        let value = IntegerList::default();
2796        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2797        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2798
2799        // Should be able to write/read StoragesHistory
2800        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2801        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2802        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2803    }
2804
2805    #[derive(Debug)]
2806    struct TestTable;
2807
2808    impl Table for TestTable {
2809        const NAME: &'static str = "TestTable";
2810        const DUPSORT: bool = false;
2811        type Key = u64;
2812        type Value = Vec<u8>;
2813    }
2814
2815    #[test]
2816    fn test_basic_operations() {
2817        let temp_dir = TempDir::new().unwrap();
2818
2819        let provider = RocksDBBuilder::new(temp_dir.path())
2820            .with_table::<TestTable>() // Type-safe!
2821            .build()
2822            .unwrap();
2823
2824        let key = 42u64;
2825        let value = b"test_value".to_vec();
2826
2827        // Test write
2828        provider.put::<TestTable>(key, &value).unwrap();
2829
2830        // Test read
2831        let result = provider.get::<TestTable>(key).unwrap();
2832        assert_eq!(result, Some(value));
2833
2834        // Test delete
2835        provider.delete::<TestTable>(key).unwrap();
2836
2837        // Verify deletion
2838        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2839    }
2840
2841    #[test]
2842    fn test_batch_operations() {
2843        let temp_dir = TempDir::new().unwrap();
2844        let provider =
2845            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2846
2847        // Write multiple entries in a batch
2848        provider
2849            .write_batch(|batch| {
2850                for i in 0..10u64 {
2851                    let value = format!("value_{i}").into_bytes();
2852                    batch.put::<TestTable>(i, &value)?;
2853                }
2854                Ok(())
2855            })
2856            .unwrap();
2857
2858        // Read all entries
2859        for i in 0..10u64 {
2860            let value = format!("value_{i}").into_bytes();
2861            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2862        }
2863
2864        // Delete all entries in a batch
2865        provider
2866            .write_batch(|batch| {
2867                for i in 0..10u64 {
2868                    batch.delete::<TestTable>(i)?;
2869                }
2870                Ok(())
2871            })
2872            .unwrap();
2873
2874        // Verify all deleted
2875        for i in 0..10u64 {
2876            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2877        }
2878    }
2879
2880    #[test]
2881    fn test_with_real_table() {
2882        let temp_dir = TempDir::new().unwrap();
2883        let provider = RocksDBBuilder::new(temp_dir.path())
2884            .with_table::<tables::TransactionHashNumbers>()
2885            .with_metrics()
2886            .build()
2887            .unwrap();
2888
2889        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2890
2891        // Insert and retrieve
2892        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2893        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2894
2895        // Batch insert multiple transactions
2896        provider
2897            .write_batch(|batch| {
2898                for i in 0..10u64 {
2899                    let hash = TxHash::from(B256::from([i as u8; 32]));
2900                    let value = i * 100;
2901                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2902                }
2903                Ok(())
2904            })
2905            .unwrap();
2906
2907        // Verify batch insertions
2908        for i in 0..10u64 {
2909            let hash = TxHash::from(B256::from([i as u8; 32]));
2910            assert_eq!(
2911                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2912                Some(i * 100)
2913            );
2914        }
2915    }
2916    #[test]
2917    fn test_statistics_enabled() {
2918        let temp_dir = TempDir::new().unwrap();
2919        // Just verify that building with statistics doesn't panic
2920        let provider = RocksDBBuilder::new(temp_dir.path())
2921            .with_table::<TestTable>()
2922            .with_statistics()
2923            .build()
2924            .unwrap();
2925
2926        // Do operations - data should be immediately readable with OptimisticTransactionDB
2927        for i in 0..10 {
2928            let value = vec![i as u8];
2929            provider.put::<TestTable>(i, &value).unwrap();
2930            // Verify write is visible
2931            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2932        }
2933    }
2934
2935    #[test]
2936    fn test_data_persistence() {
2937        let temp_dir = TempDir::new().unwrap();
2938        let provider =
2939            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2940
2941        // Insert data - OptimisticTransactionDB writes are immediately visible
2942        let value = vec![42u8; 1000];
2943        for i in 0..100 {
2944            provider.put::<TestTable>(i, &value).unwrap();
2945        }
2946
2947        // Verify data is readable
2948        for i in 0..100 {
2949            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2950        }
2951    }
2952
2953    #[test]
2954    fn test_transaction_read_your_writes() {
2955        let temp_dir = TempDir::new().unwrap();
2956        let provider =
2957            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2958
2959        // Create a transaction
2960        let tx = provider.tx();
2961
2962        // Write data within the transaction
2963        let key = 42u64;
2964        let value = b"test_value".to_vec();
2965        tx.put::<TestTable>(key, &value).unwrap();
2966
2967        // Read-your-writes: should see uncommitted data in same transaction
2968        let result = tx.get::<TestTable>(key).unwrap();
2969        assert_eq!(
2970            result,
2971            Some(value.clone()),
2972            "Transaction should see its own uncommitted writes"
2973        );
2974
2975        // Data should NOT be visible via provider (outside transaction)
2976        let provider_result = provider.get::<TestTable>(key).unwrap();
2977        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2978
2979        // Commit the transaction
2980        tx.commit().unwrap();
2981
2982        // Now data should be visible via provider
2983        let committed_result = provider.get::<TestTable>(key).unwrap();
2984        assert_eq!(committed_result, Some(value), "Committed data should be visible");
2985    }
2986
2987    #[test]
2988    fn test_transaction_rollback() {
2989        let temp_dir = TempDir::new().unwrap();
2990        let provider =
2991            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2992
2993        // First, put some initial data
2994        let key = 100u64;
2995        let initial_value = b"initial".to_vec();
2996        provider.put::<TestTable>(key, &initial_value).unwrap();
2997
2998        // Create a transaction and modify data
2999        let tx = provider.tx();
3000        let new_value = b"modified".to_vec();
3001        tx.put::<TestTable>(key, &new_value).unwrap();
3002
3003        // Verify modification is visible within transaction
3004        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
3005
3006        // Rollback instead of commit
3007        tx.rollback().unwrap();
3008
3009        // Data should be unchanged (initial value)
3010        let result = provider.get::<TestTable>(key).unwrap();
3011        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
3012    }
3013
3014    #[test]
3015    fn test_transaction_iterator() {
3016        let temp_dir = TempDir::new().unwrap();
3017        let provider =
3018            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3019
3020        // Create a transaction
3021        let tx = provider.tx();
3022
3023        // Write multiple entries
3024        for i in 0..5u64 {
3025            let value = format!("value_{i}").into_bytes();
3026            tx.put::<TestTable>(i, &value).unwrap();
3027        }
3028
3029        // Iterate - should see uncommitted writes
3030        let mut count = 0;
3031        for result in tx.iter::<TestTable>().unwrap() {
3032            let (key, value) = result.unwrap();
3033            assert_eq!(value, format!("value_{key}").into_bytes());
3034            count += 1;
3035        }
3036        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
3037
3038        // Commit
3039        tx.commit().unwrap();
3040    }
3041
3042    #[test]
3043    fn test_batch_manual_commit() {
3044        let temp_dir = TempDir::new().unwrap();
3045        let provider =
3046            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3047
3048        // Create a batch via provider.batch()
3049        let mut batch = provider.batch();
3050
3051        // Add entries
3052        for i in 0..10u64 {
3053            let value = format!("batch_value_{i}").into_bytes();
3054            batch.put::<TestTable>(i, &value).unwrap();
3055        }
3056
3057        // Verify len/is_empty
3058        assert_eq!(batch.len(), 10);
3059        assert!(!batch.is_empty());
3060
3061        // Data should NOT be visible before commit
3062        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
3063
3064        // Commit the batch
3065        batch.commit().unwrap();
3066
3067        // Now data should be visible
3068        for i in 0..10u64 {
3069            let value = format!("batch_value_{i}").into_bytes();
3070            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3071        }
3072    }
3073
3074    #[test]
3075    fn test_first_and_last_entry() {
3076        let temp_dir = TempDir::new().unwrap();
3077        let provider =
3078            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3079
3080        // Empty table should return None for both
3081        assert_eq!(provider.first::<TestTable>().unwrap(), None);
3082        assert_eq!(provider.last::<TestTable>().unwrap(), None);
3083
3084        // Insert some entries
3085        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
3086        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
3087        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
3088
3089        // First should return the smallest key
3090        let first = provider.first::<TestTable>().unwrap();
3091        assert_eq!(first, Some((5, b"value_5".to_vec())));
3092
3093        // Last should return the largest key
3094        let last = provider.last::<TestTable>().unwrap();
3095        assert_eq!(last, Some((20, b"value_20".to_vec())));
3096    }
3097
3098    /// Tests the edge case where block < `lowest_available_block_number`.
3099    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
3100    /// so we keep this RocksDB-specific test to verify the low-level behavior.
3101    #[test]
3102    fn test_account_history_info_pruned_before_first_entry() {
3103        let temp_dir = TempDir::new().unwrap();
3104        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3105
3106        let address = Address::from([0x42; 20]);
3107
3108        // Create a single shard starting at block 100
3109        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3110        let shard_key = ShardedKey::new(address, u64::MAX);
3111        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3112
3113        // Query for block 50 with lowest_available_block_number = 100
3114        // This simulates a pruned state where data before block 100 is not available.
3115        // Since we're before the first write AND pruning boundary is set, we need to
3116        // check the changeset at the first write block.
3117        let result =
3118            provider.snapshot().account_history_info(address, 50, Some(100), u64::MAX).unwrap();
3119        assert_eq!(result, HistoryInfo::InChangeset(100));
3120    }
3121
3122    /// Verifies that a read-only (secondary) provider can catch up with primary writes.
3123    #[test]
3124    fn test_account_history_info_read_only_and_catch_up() {
3125        let temp_dir = TempDir::new().unwrap();
3126        let address = Address::from([0x42; 20]);
3127        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3128        let shard_key = ShardedKey::new(address, u64::MAX);
3129
3130        // Write data with a read-write provider
3131        let rw_provider =
3132            RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3133        rw_provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3134
3135        // Open read-only provider — it sees the initial data.
3136        let ro_provider = RocksDBBuilder::new(temp_dir.path())
3137            .with_default_tables()
3138            .with_read_only(true)
3139            .build()
3140            .unwrap();
3141
3142        let result =
3143            ro_provider.snapshot().account_history_info(address, 200, None, u64::MAX).unwrap();
3144        assert_eq!(result, HistoryInfo::InChangeset(200));
3145
3146        let result =
3147            ro_provider.snapshot().account_history_info(address, 50, None, u64::MAX).unwrap();
3148        assert_eq!(result, HistoryInfo::NotYetWritten);
3149
3150        let result =
3151            ro_provider.snapshot().account_history_info(address, 400, None, u64::MAX).unwrap();
3152        assert_eq!(result, HistoryInfo::InPlainState);
3153
3154        // Write new data via the primary.
3155        let address2 = Address::from([0x43; 20]);
3156        let chunk2 = IntegerList::new([500, 600]).unwrap();
3157        let shard_key2 = ShardedKey::new(address2, u64::MAX);
3158        rw_provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
3159
3160        // Read-only doesn't see the new data yet.
3161        let result =
3162            ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3163        assert_eq!(result, HistoryInfo::NotYetWritten);
3164
3165        // Catch up — now it sees the new data.
3166        ro_provider.try_catch_up_with_primary().unwrap();
3167
3168        let result =
3169            ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3170        assert_eq!(result, HistoryInfo::InChangeset(500));
3171    }
3172
3173    #[test]
3174    fn test_account_history_info_ignores_blocks_above_visible_tip() {
3175        let temp_dir = TempDir::new().unwrap();
3176        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3177
3178        let address = Address::from([0x42; 20]);
3179
3180        provider
3181            .put::<tables::AccountsHistory>(
3182                ShardedKey::new(address, 110),
3183                &IntegerList::new([100, 110]).unwrap(),
3184            )
3185            .unwrap();
3186        provider
3187            .put::<tables::AccountsHistory>(
3188                ShardedKey::new(address, u64::MAX),
3189                &IntegerList::new([200, 210]).unwrap(),
3190            )
3191            .unwrap();
3192
3193        let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3194        assert_eq!(result, HistoryInfo::InPlainState);
3195    }
3196
3197    #[test]
3198    fn test_account_history_info_mixed_shard_respects_visible_tip() {
3199        let temp_dir = TempDir::new().unwrap();
3200        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3201
3202        let address = Address::from([0x42; 20]);
3203        provider
3204            .put::<tables::AccountsHistory>(
3205                ShardedKey::new(address, u64::MAX),
3206                &IntegerList::new([100, 150, 300]).unwrap(),
3207            )
3208            .unwrap();
3209
3210        let result = provider.snapshot().account_history_info(address, 120, None, 200).unwrap();
3211        assert_eq!(result, HistoryInfo::InChangeset(150));
3212
3213        let result = provider.snapshot().account_history_info(address, 201, None, 200).unwrap();
3214        assert_eq!(result, HistoryInfo::InPlainState);
3215    }
3216
3217    #[test]
3218    fn test_account_history_info_only_stale_entries_use_fallback() {
3219        let temp_dir = TempDir::new().unwrap();
3220        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3221
3222        let address = Address::from([0x42; 20]);
3223        provider
3224            .put::<tables::AccountsHistory>(
3225                ShardedKey::new(address, u64::MAX),
3226                &IntegerList::new([200, 210]).unwrap(),
3227            )
3228            .unwrap();
3229
3230        let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3231        assert_eq!(result, HistoryInfo::NotYetWritten);
3232
3233        let result =
3234            provider.snapshot().account_history_info(address, 150, Some(100), 150).unwrap();
3235        assert_eq!(result, HistoryInfo::MaybeInPlainState);
3236    }
3237
3238    #[test]
3239    fn test_account_history_shard_split_at_boundary() {
3240        let temp_dir = TempDir::new().unwrap();
3241        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3242
3243        let address = Address::from([0x42; 20]);
3244        let limit = NUM_OF_INDICES_IN_SHARD;
3245
3246        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3247        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3248        let mut batch = provider.batch();
3249        batch.append_account_history_shard(address, indices).unwrap();
3250        batch.commit().unwrap();
3251
3252        // Should have 2 shards: one completed shard and one sentinel shard
3253        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3254        let sentinel_key = ShardedKey::new(address, u64::MAX);
3255
3256        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3257        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3258
3259        assert!(completed_shard.is_some(), "completed shard should exist");
3260        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3261
3262        let completed_shard = completed_shard.unwrap();
3263        let sentinel_shard = sentinel_shard.unwrap();
3264
3265        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3266        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3267    }
3268
3269    #[test]
3270    fn test_account_history_multiple_shard_splits() {
3271        let temp_dir = TempDir::new().unwrap();
3272        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3273
3274        let address = Address::from([0x43; 20]);
3275        let limit = NUM_OF_INDICES_IN_SHARD;
3276
3277        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3278        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3279        let mut batch = provider.batch();
3280        batch.append_account_history_shard(address, first_batch_indices).unwrap();
3281        batch.commit().unwrap();
3282
3283        // Should have just a sentinel shard (exactly at limit, not over)
3284        let sentinel_key = ShardedKey::new(address, u64::MAX);
3285        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3286        assert!(shard.is_some());
3287        assert_eq!(shard.unwrap().len(), limit as u64);
3288
3289        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3290        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3291        let mut batch = provider.batch();
3292        batch.append_account_history_shard(address, second_batch_indices).unwrap();
3293        batch.commit().unwrap();
3294
3295        // Now we should have: 2 completed shards + 1 sentinel shard
3296        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3297        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3298
3299        assert!(
3300            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3301            "first completed shard should exist"
3302        );
3303        assert!(
3304            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3305            "second completed shard should exist"
3306        );
3307        assert!(
3308            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3309            "sentinel shard should exist"
3310        );
3311    }
3312
3313    #[test]
3314    fn test_storage_history_shard_split_at_boundary() {
3315        let temp_dir = TempDir::new().unwrap();
3316        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3317
3318        let address = Address::from([0x44; 20]);
3319        let slot = B256::from([0x55; 32]);
3320        let limit = NUM_OF_INDICES_IN_SHARD;
3321
3322        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3323        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3324        let mut batch = provider.batch();
3325        batch.append_storage_history_shard(address, slot, indices).unwrap();
3326        batch.commit().unwrap();
3327
3328        // Should have 2 shards: one completed shard and one sentinel shard
3329        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3330        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3331
3332        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3333        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3334
3335        assert!(completed_shard.is_some(), "completed shard should exist");
3336        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3337
3338        let completed_shard = completed_shard.unwrap();
3339        let sentinel_shard = sentinel_shard.unwrap();
3340
3341        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3342        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3343    }
3344
3345    #[test]
3346    fn test_storage_history_multiple_shard_splits() {
3347        let temp_dir = TempDir::new().unwrap();
3348        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3349
3350        let address = Address::from([0x46; 20]);
3351        let slot = B256::from([0x57; 32]);
3352        let limit = NUM_OF_INDICES_IN_SHARD;
3353
3354        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3355        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3356        let mut batch = provider.batch();
3357        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3358        batch.commit().unwrap();
3359
3360        // Should have just a sentinel shard (exactly at limit, not over)
3361        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3362        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3363        assert!(shard.is_some());
3364        assert_eq!(shard.unwrap().len(), limit as u64);
3365
3366        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3367        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3368        let mut batch = provider.batch();
3369        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3370        batch.commit().unwrap();
3371
3372        // Now we should have: 2 completed shards + 1 sentinel shard
3373        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3374        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3375
3376        assert!(
3377            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3378            "first completed shard should exist"
3379        );
3380        assert!(
3381            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3382            "second completed shard should exist"
3383        );
3384        assert!(
3385            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3386            "sentinel shard should exist"
3387        );
3388    }
3389
3390    #[test]
3391    fn test_clear_table() {
3392        let temp_dir = TempDir::new().unwrap();
3393        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3394
3395        let address = Address::from([0x42; 20]);
3396        let key = ShardedKey::new(address, u64::MAX);
3397        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3398
3399        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3400        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3401
3402        provider.clear::<tables::AccountsHistory>().unwrap();
3403
3404        assert!(
3405            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3406            "table should be empty after clear"
3407        );
3408        assert!(
3409            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3410            "first() should return None after clear"
3411        );
3412    }
3413
3414    #[test]
3415    fn test_clear_empty_table() {
3416        let temp_dir = TempDir::new().unwrap();
3417        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3418
3419        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3420
3421        provider.clear::<tables::AccountsHistory>().unwrap();
3422
3423        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3424    }
3425
3426    #[test]
3427    fn test_unwind_account_history_to_basic() {
3428        let temp_dir = TempDir::new().unwrap();
3429        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3430
3431        let address = Address::from([0x42; 20]);
3432
3433        // Add blocks 0-10
3434        let mut batch = provider.batch();
3435        batch.append_account_history_shard(address, 0..=10).unwrap();
3436        batch.commit().unwrap();
3437
3438        // Verify we have blocks 0-10
3439        let key = ShardedKey::new(address, u64::MAX);
3440        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3441        assert!(result.is_some());
3442        let blocks: Vec<u64> = result.unwrap().iter().collect();
3443        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3444
3445        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
3446        let mut batch = provider.batch();
3447        batch.unwind_account_history_to(address, 5).unwrap();
3448        batch.commit().unwrap();
3449
3450        // Verify only blocks 0-5 remain
3451        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3452        assert!(result.is_some());
3453        let blocks: Vec<u64> = result.unwrap().iter().collect();
3454        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3455    }
3456
3457    #[test]
3458    fn test_unwind_account_history_to_removes_all() {
3459        let temp_dir = TempDir::new().unwrap();
3460        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3461
3462        let address = Address::from([0x42; 20]);
3463
3464        // Add blocks 5-10
3465        let mut batch = provider.batch();
3466        batch.append_account_history_shard(address, 5..=10).unwrap();
3467        batch.commit().unwrap();
3468
3469        // Unwind to block 4 (removes all blocks since they're all > 4)
3470        let mut batch = provider.batch();
3471        batch.unwind_account_history_to(address, 4).unwrap();
3472        batch.commit().unwrap();
3473
3474        // Verify no data remains for this address
3475        let key = ShardedKey::new(address, u64::MAX);
3476        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3477        assert!(result.is_none(), "Should have no data after full unwind");
3478    }
3479
3480    #[test]
3481    fn test_unwind_account_history_to_no_op() {
3482        let temp_dir = TempDir::new().unwrap();
3483        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3484
3485        let address = Address::from([0x42; 20]);
3486
3487        // Add blocks 0-5
3488        let mut batch = provider.batch();
3489        batch.append_account_history_shard(address, 0..=5).unwrap();
3490        batch.commit().unwrap();
3491
3492        // Unwind to block 10 (no-op since all blocks are <= 10)
3493        let mut batch = provider.batch();
3494        batch.unwind_account_history_to(address, 10).unwrap();
3495        batch.commit().unwrap();
3496
3497        // Verify blocks 0-5 still remain
3498        let key = ShardedKey::new(address, u64::MAX);
3499        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3500        assert!(result.is_some());
3501        let blocks: Vec<u64> = result.unwrap().iter().collect();
3502        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3503    }
3504
3505    #[test]
3506    fn test_unwind_account_history_to_block_zero() {
3507        let temp_dir = TempDir::new().unwrap();
3508        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3509
3510        let address = Address::from([0x42; 20]);
3511
3512        // Add blocks 0-5 (including block 0)
3513        let mut batch = provider.batch();
3514        batch.append_account_history_shard(address, 0..=5).unwrap();
3515        batch.commit().unwrap();
3516
3517        // Unwind to block 0 (keep only block 0, remove 1-5)
3518        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
3519        let mut batch = provider.batch();
3520        batch.unwind_account_history_to(address, 0).unwrap();
3521        batch.commit().unwrap();
3522
3523        // Verify only block 0 remains
3524        let key = ShardedKey::new(address, u64::MAX);
3525        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3526        assert!(result.is_some());
3527        let blocks: Vec<u64> = result.unwrap().iter().collect();
3528        assert_eq!(blocks, vec![0]);
3529    }
3530
3531    #[test]
3532    fn test_unwind_account_history_to_multi_shard() {
3533        let temp_dir = TempDir::new().unwrap();
3534        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3535
3536        let address = Address::from([0x42; 20]);
3537
3538        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
3539        // For testing, we'll manually create shards with specific keys
3540        let mut batch = provider.batch();
3541
3542        // First shard: blocks 1-50, keyed by 50
3543        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3544        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3545
3546        // Second shard: blocks 51-100, keyed by MAX (sentinel)
3547        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3548        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3549
3550        batch.commit().unwrap();
3551
3552        // Verify we have 2 shards
3553        let shards = provider.account_history_shards(address).unwrap();
3554        assert_eq!(shards.len(), 2);
3555
3556        // Unwind to block 75 (keep 1-75, remove 76-100)
3557        let mut batch = provider.batch();
3558        batch.unwind_account_history_to(address, 75).unwrap();
3559        batch.commit().unwrap();
3560
3561        // Verify: shard1 should be untouched, shard2 should be truncated
3562        let shards = provider.account_history_shards(address).unwrap();
3563        assert_eq!(shards.len(), 2);
3564
3565        // First shard unchanged
3566        assert_eq!(shards[0].0.highest_block_number, 50);
3567        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3568
3569        // Second shard truncated and re-keyed to MAX
3570        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3571        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3572    }
3573
3574    #[test]
3575    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3576        let temp_dir = TempDir::new().unwrap();
3577        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3578
3579        let address = Address::from([0x42; 20]);
3580
3581        // Create two shards
3582        let mut batch = provider.batch();
3583
3584        // First shard: blocks 1-50, keyed by 50
3585        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3586        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3587
3588        // Second shard: blocks 75-100, keyed by MAX
3589        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3590        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3591
3592        batch.commit().unwrap();
3593
3594        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
3595        let mut batch = provider.batch();
3596        batch.unwind_account_history_to(address, 60).unwrap();
3597        batch.commit().unwrap();
3598
3599        // Verify: only shard1 remains, now keyed as MAX
3600        let shards = provider.account_history_shards(address).unwrap();
3601        assert_eq!(shards.len(), 1);
3602        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3603        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3604    }
3605
3606    #[test]
3607    fn test_account_history_shards_iterator() {
3608        let temp_dir = TempDir::new().unwrap();
3609        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3610
3611        let address = Address::from([0x42; 20]);
3612        let other_address = Address::from([0x43; 20]);
3613
3614        // Add data for two addresses
3615        let mut batch = provider.batch();
3616        batch.append_account_history_shard(address, 0..=5).unwrap();
3617        batch.append_account_history_shard(other_address, 10..=15).unwrap();
3618        batch.commit().unwrap();
3619
3620        // Query shards for first address only
3621        let shards = provider.account_history_shards(address).unwrap();
3622        assert_eq!(shards.len(), 1);
3623        assert_eq!(shards[0].0.key, address);
3624
3625        // Query shards for second address only
3626        let shards = provider.account_history_shards(other_address).unwrap();
3627        assert_eq!(shards.len(), 1);
3628        assert_eq!(shards[0].0.key, other_address);
3629
3630        // Query shards for non-existent address
3631        let non_existent = Address::from([0x99; 20]);
3632        let shards = provider.account_history_shards(non_existent).unwrap();
3633        assert!(shards.is_empty());
3634    }
3635
3636    #[test]
3637    fn test_clear_account_history() {
3638        let temp_dir = TempDir::new().unwrap();
3639        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3640
3641        let address = Address::from([0x42; 20]);
3642
3643        // Add blocks 0-10
3644        let mut batch = provider.batch();
3645        batch.append_account_history_shard(address, 0..=10).unwrap();
3646        batch.commit().unwrap();
3647
3648        // Clear all history (simulates unwind from block 0)
3649        let mut batch = provider.batch();
3650        batch.clear_account_history(address).unwrap();
3651        batch.commit().unwrap();
3652
3653        // Verify no data remains
3654        let shards = provider.account_history_shards(address).unwrap();
3655        assert!(shards.is_empty(), "All shards should be deleted");
3656    }
3657
3658    #[test]
3659    fn test_unwind_non_sentinel_boundary() {
3660        let temp_dir = TempDir::new().unwrap();
3661        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3662
3663        let address = Address::from([0x42; 20]);
3664
3665        // Create three shards with non-sentinel boundary
3666        let mut batch = provider.batch();
3667
3668        // Shard 1: blocks 1-50, keyed by 50
3669        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3670        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3671
3672        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
3673        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3674        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3675
3676        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
3677        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3678        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3679
3680        batch.commit().unwrap();
3681
3682        // Verify 3 shards
3683        let shards = provider.account_history_shards(address).unwrap();
3684        assert_eq!(shards.len(), 3);
3685
3686        // Unwind to block 75 (truncates shard2, deletes shard3)
3687        let mut batch = provider.batch();
3688        batch.unwind_account_history_to(address, 75).unwrap();
3689        batch.commit().unwrap();
3690
3691        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
3692        let shards = provider.account_history_shards(address).unwrap();
3693        assert_eq!(shards.len(), 2);
3694
3695        // First shard unchanged
3696        assert_eq!(shards[0].0.highest_block_number, 50);
3697        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3698
3699        // Second shard truncated and re-keyed to MAX
3700        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3701        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3702    }
3703
3704    #[test]
3705    fn test_batch_auto_commit_on_threshold() {
3706        let temp_dir = TempDir::new().unwrap();
3707        let provider =
3708            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3709
3710        // Create batch with tiny threshold (1KB) to force auto-commits
3711        let mut batch = RocksDBBatch {
3712            provider: &provider,
3713            inner: WriteBatchWithTransaction::<true>::default(),
3714            buf: Vec::new(),
3715            auto_commit_threshold: Some(1024), // 1KB
3716        };
3717
3718        // Write entries until we exceed threshold multiple times
3719        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
3720        for i in 0..100u64 {
3721            let value = format!("value_{i:04}").into_bytes();
3722            batch.put::<TestTable>(i, &value).unwrap();
3723        }
3724
3725        // Data should already be visible (auto-committed) even before final commit
3726        // At least some entries should be readable
3727        let first_visible = provider.get::<TestTable>(0).unwrap();
3728        assert!(first_visible.is_some(), "Auto-committed data should be visible");
3729
3730        // Final commit for remaining batch
3731        batch.commit().unwrap();
3732
3733        // All entries should now be visible
3734        for i in 0..100u64 {
3735            let value = format!("value_{i:04}").into_bytes();
3736            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3737        }
3738    }
3739
3740    // ==================== PARAMETERIZED PRUNE TESTS ====================
3741
3742    /// Test case for account history pruning
3743    struct AccountPruneCase {
3744        name: &'static str,
3745        initial_shards: &'static [(u64, &'static [u64])],
3746        prune_to: u64,
3747        expected_outcome: PruneShardOutcome,
3748        expected_shards: &'static [(u64, &'static [u64])],
3749    }
3750
3751    /// Test case for storage history pruning
3752    struct StoragePruneCase {
3753        name: &'static str,
3754        initial_shards: &'static [(u64, &'static [u64])],
3755        prune_to: u64,
3756        expected_outcome: PruneShardOutcome,
3757        expected_shards: &'static [(u64, &'static [u64])],
3758    }
3759
3760    #[test]
3761    fn test_prune_account_history_cases() {
3762        const MAX: u64 = u64::MAX;
3763        const CASES: &[AccountPruneCase] = &[
3764            AccountPruneCase {
3765                name: "single_shard_truncate",
3766                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3767                prune_to: 25,
3768                expected_outcome: PruneShardOutcome::Updated,
3769                expected_shards: &[(MAX, &[30, 40])],
3770            },
3771            AccountPruneCase {
3772                name: "single_shard_delete_all",
3773                initial_shards: &[(MAX, &[10, 20])],
3774                prune_to: 20,
3775                expected_outcome: PruneShardOutcome::Deleted,
3776                expected_shards: &[],
3777            },
3778            AccountPruneCase {
3779                name: "single_shard_noop",
3780                initial_shards: &[(MAX, &[10, 20])],
3781                prune_to: 5,
3782                expected_outcome: PruneShardOutcome::Unchanged,
3783                expected_shards: &[(MAX, &[10, 20])],
3784            },
3785            AccountPruneCase {
3786                name: "no_shards",
3787                initial_shards: &[],
3788                prune_to: 100,
3789                expected_outcome: PruneShardOutcome::Unchanged,
3790                expected_shards: &[],
3791            },
3792            AccountPruneCase {
3793                name: "multi_shard_truncate_first",
3794                initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3795                prune_to: 25,
3796                expected_outcome: PruneShardOutcome::Updated,
3797                expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3798            },
3799            AccountPruneCase {
3800                name: "delete_first_shard_sentinel_unchanged",
3801                initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3802                prune_to: 20,
3803                expected_outcome: PruneShardOutcome::Deleted,
3804                expected_shards: &[(MAX, &[30, 40])],
3805            },
3806            AccountPruneCase {
3807                name: "multi_shard_delete_all_but_last",
3808                initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3809                prune_to: 22,
3810                expected_outcome: PruneShardOutcome::Deleted,
3811                expected_shards: &[(MAX, &[25, 30])],
3812            },
3813            AccountPruneCase {
3814                name: "mid_shard_preserves_key",
3815                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3816                prune_to: 25,
3817                expected_outcome: PruneShardOutcome::Updated,
3818                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3819            },
3820            // Equivalence tests
3821            AccountPruneCase {
3822                name: "equiv_delete_early_shards_keep_sentinel",
3823                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3824                prune_to: 55,
3825                expected_outcome: PruneShardOutcome::Deleted,
3826                expected_shards: &[(MAX, &[60, 70])],
3827            },
3828            AccountPruneCase {
3829                name: "equiv_sentinel_becomes_empty_with_prev",
3830                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3831                prune_to: 40,
3832                expected_outcome: PruneShardOutcome::Deleted,
3833                expected_shards: &[(MAX, &[50])],
3834            },
3835            AccountPruneCase {
3836                name: "equiv_all_shards_become_empty",
3837                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3838                prune_to: 51,
3839                expected_outcome: PruneShardOutcome::Deleted,
3840                expected_shards: &[],
3841            },
3842            AccountPruneCase {
3843                name: "equiv_non_sentinel_last_shard_promoted",
3844                initial_shards: &[(100, &[50, 75, 100])],
3845                prune_to: 60,
3846                expected_outcome: PruneShardOutcome::Updated,
3847                expected_shards: &[(MAX, &[75, 100])],
3848            },
3849            AccountPruneCase {
3850                name: "equiv_filter_within_shard",
3851                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3852                prune_to: 25,
3853                expected_outcome: PruneShardOutcome::Updated,
3854                expected_shards: &[(MAX, &[30, 40])],
3855            },
3856            AccountPruneCase {
3857                name: "equiv_multi_shard_partial_delete",
3858                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3859                prune_to: 35,
3860                expected_outcome: PruneShardOutcome::Deleted,
3861                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3862            },
3863        ];
3864
3865        let address = Address::from([0x42; 20]);
3866
3867        for case in CASES {
3868            let temp_dir = TempDir::new().unwrap();
3869            let provider =
3870                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3871
3872            // Setup initial shards
3873            let mut batch = provider.batch();
3874            for (highest, blocks) in case.initial_shards {
3875                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3876                batch
3877                    .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3878                    .unwrap();
3879            }
3880            batch.commit().unwrap();
3881
3882            // Prune
3883            let mut batch = provider.batch();
3884            let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3885            batch.commit().unwrap();
3886
3887            // Assert outcome
3888            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3889
3890            // Assert final shards
3891            let shards = provider.account_history_shards(address).unwrap();
3892            assert_eq!(
3893                shards.len(),
3894                case.expected_shards.len(),
3895                "case '{}': wrong shard count",
3896                case.name
3897            );
3898            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3899                shards.iter().zip(case.expected_shards.iter()).enumerate()
3900            {
3901                assert_eq!(
3902                    key.highest_block_number, *exp_key,
3903                    "case '{}': shard {} wrong key",
3904                    case.name, i
3905                );
3906                assert_eq!(
3907                    blocks.iter().collect::<Vec<_>>(),
3908                    *exp_blocks,
3909                    "case '{}': shard {} wrong blocks",
3910                    case.name,
3911                    i
3912                );
3913            }
3914        }
3915    }
3916
3917    #[test]
3918    fn test_prune_storage_history_cases() {
3919        const MAX: u64 = u64::MAX;
3920        const CASES: &[StoragePruneCase] = &[
3921            StoragePruneCase {
3922                name: "single_shard_truncate",
3923                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3924                prune_to: 25,
3925                expected_outcome: PruneShardOutcome::Updated,
3926                expected_shards: &[(MAX, &[30, 40])],
3927            },
3928            StoragePruneCase {
3929                name: "single_shard_delete_all",
3930                initial_shards: &[(MAX, &[10, 20])],
3931                prune_to: 20,
3932                expected_outcome: PruneShardOutcome::Deleted,
3933                expected_shards: &[],
3934            },
3935            StoragePruneCase {
3936                name: "noop",
3937                initial_shards: &[(MAX, &[10, 20])],
3938                prune_to: 5,
3939                expected_outcome: PruneShardOutcome::Unchanged,
3940                expected_shards: &[(MAX, &[10, 20])],
3941            },
3942            StoragePruneCase {
3943                name: "no_shards",
3944                initial_shards: &[],
3945                prune_to: 100,
3946                expected_outcome: PruneShardOutcome::Unchanged,
3947                expected_shards: &[],
3948            },
3949            StoragePruneCase {
3950                name: "mid_shard_preserves_key",
3951                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3952                prune_to: 25,
3953                expected_outcome: PruneShardOutcome::Updated,
3954                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3955            },
3956            // Equivalence tests
3957            StoragePruneCase {
3958                name: "equiv_sentinel_promotion",
3959                initial_shards: &[(100, &[50, 75, 100])],
3960                prune_to: 60,
3961                expected_outcome: PruneShardOutcome::Updated,
3962                expected_shards: &[(MAX, &[75, 100])],
3963            },
3964            StoragePruneCase {
3965                name: "equiv_delete_early_shards_keep_sentinel",
3966                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3967                prune_to: 55,
3968                expected_outcome: PruneShardOutcome::Deleted,
3969                expected_shards: &[(MAX, &[60, 70])],
3970            },
3971            StoragePruneCase {
3972                name: "equiv_sentinel_becomes_empty_with_prev",
3973                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3974                prune_to: 40,
3975                expected_outcome: PruneShardOutcome::Deleted,
3976                expected_shards: &[(MAX, &[50])],
3977            },
3978            StoragePruneCase {
3979                name: "equiv_all_shards_become_empty",
3980                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3981                prune_to: 51,
3982                expected_outcome: PruneShardOutcome::Deleted,
3983                expected_shards: &[],
3984            },
3985            StoragePruneCase {
3986                name: "equiv_filter_within_shard",
3987                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3988                prune_to: 25,
3989                expected_outcome: PruneShardOutcome::Updated,
3990                expected_shards: &[(MAX, &[30, 40])],
3991            },
3992            StoragePruneCase {
3993                name: "equiv_multi_shard_partial_delete",
3994                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3995                prune_to: 35,
3996                expected_outcome: PruneShardOutcome::Deleted,
3997                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3998            },
3999        ];
4000
4001        let address = Address::from([0x42; 20]);
4002        let storage_key = B256::from([0x01; 32]);
4003
4004        for case in CASES {
4005            let temp_dir = TempDir::new().unwrap();
4006            let provider =
4007                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4008
4009            // Setup initial shards
4010            let mut batch = provider.batch();
4011            for (highest, blocks) in case.initial_shards {
4012                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4013                let key = if *highest == MAX {
4014                    StorageShardedKey::last(address, storage_key)
4015                } else {
4016                    StorageShardedKey::new(address, storage_key, *highest)
4017                };
4018                batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4019            }
4020            batch.commit().unwrap();
4021
4022            // Prune
4023            let mut batch = provider.batch();
4024            let outcome =
4025                batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
4026            batch.commit().unwrap();
4027
4028            // Assert outcome
4029            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
4030
4031            // Assert final shards
4032            let shards = provider.storage_history_shards(address, storage_key).unwrap();
4033            assert_eq!(
4034                shards.len(),
4035                case.expected_shards.len(),
4036                "case '{}': wrong shard count",
4037                case.name
4038            );
4039            for (i, ((key, blocks), (exp_key, exp_blocks))) in
4040                shards.iter().zip(case.expected_shards.iter()).enumerate()
4041            {
4042                assert_eq!(
4043                    key.sharded_key.highest_block_number, *exp_key,
4044                    "case '{}': shard {} wrong key",
4045                    case.name, i
4046                );
4047                assert_eq!(
4048                    blocks.iter().collect::<Vec<_>>(),
4049                    *exp_blocks,
4050                    "case '{}': shard {} wrong blocks",
4051                    case.name,
4052                    i
4053                );
4054            }
4055        }
4056    }
4057
4058    #[test]
4059    fn test_prune_storage_history_does_not_affect_other_slots() {
4060        let temp_dir = TempDir::new().unwrap();
4061        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4062
4063        let address = Address::from([0x42; 20]);
4064        let slot1 = B256::from([0x01; 32]);
4065        let slot2 = B256::from([0x02; 32]);
4066
4067        // Two different storage slots
4068        let mut batch = provider.batch();
4069        batch
4070            .put::<tables::StoragesHistory>(
4071                StorageShardedKey::last(address, slot1),
4072                &BlockNumberList::new_pre_sorted([10u64, 20]),
4073            )
4074            .unwrap();
4075        batch
4076            .put::<tables::StoragesHistory>(
4077                StorageShardedKey::last(address, slot2),
4078                &BlockNumberList::new_pre_sorted([30u64, 40]),
4079            )
4080            .unwrap();
4081        batch.commit().unwrap();
4082
4083        // Prune slot1 to block 20 (deletes all)
4084        let mut batch = provider.batch();
4085        let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
4086        batch.commit().unwrap();
4087
4088        assert_eq!(outcome, PruneShardOutcome::Deleted);
4089
4090        // slot1 should be empty
4091        let shards1 = provider.storage_history_shards(address, slot1).unwrap();
4092        assert!(shards1.is_empty());
4093
4094        // slot2 should be unchanged
4095        let shards2 = provider.storage_history_shards(address, slot2).unwrap();
4096        assert_eq!(shards2.len(), 1);
4097        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
4098    }
4099
4100    #[test]
4101    fn test_prune_invariants() {
4102        // Test invariants: no empty shards, sentinel is always last
4103        let address = Address::from([0x42; 20]);
4104        let storage_key = B256::from([0x01; 32]);
4105
4106        // Test cases that exercise invariants
4107        #[expect(clippy::type_complexity)]
4108        let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
4109            // Account: shards where middle becomes empty
4110            (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
4111            // Account: non-sentinel shard only, partial prune -> must become sentinel
4112            (&[(100, &[50, 100])], 60),
4113        ];
4114
4115        for (initial_shards, prune_to) in invariant_cases {
4116            // Test account history invariants
4117            {
4118                let temp_dir = TempDir::new().unwrap();
4119                let provider =
4120                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4121
4122                let mut batch = provider.batch();
4123                for (highest, blocks) in *initial_shards {
4124                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4125                    batch
4126                        .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
4127                        .unwrap();
4128                }
4129                batch.commit().unwrap();
4130
4131                let mut batch = provider.batch();
4132                batch.prune_account_history_to(address, *prune_to).unwrap();
4133                batch.commit().unwrap();
4134
4135                let shards = provider.account_history_shards(address).unwrap();
4136
4137                // Invariant 1: no empty shards
4138                for (key, blocks) in &shards {
4139                    assert!(
4140                        !blocks.is_empty(),
4141                        "Account: empty shard at key {}",
4142                        key.highest_block_number
4143                    );
4144                }
4145
4146                // Invariant 2: last shard is sentinel
4147                if !shards.is_empty() {
4148                    let last = shards.last().unwrap();
4149                    assert_eq!(
4150                        last.0.highest_block_number,
4151                        u64::MAX,
4152                        "Account: last shard must be sentinel"
4153                    );
4154                }
4155            }
4156
4157            // Test storage history invariants
4158            {
4159                let temp_dir = TempDir::new().unwrap();
4160                let provider =
4161                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4162
4163                let mut batch = provider.batch();
4164                for (highest, blocks) in *initial_shards {
4165                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4166                    let key = if *highest == u64::MAX {
4167                        StorageShardedKey::last(address, storage_key)
4168                    } else {
4169                        StorageShardedKey::new(address, storage_key, *highest)
4170                    };
4171                    batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4172                }
4173                batch.commit().unwrap();
4174
4175                let mut batch = provider.batch();
4176                batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
4177                batch.commit().unwrap();
4178
4179                let shards = provider.storage_history_shards(address, storage_key).unwrap();
4180
4181                // Invariant 1: no empty shards
4182                for (key, blocks) in &shards {
4183                    assert!(
4184                        !blocks.is_empty(),
4185                        "Storage: empty shard at key {}",
4186                        key.sharded_key.highest_block_number
4187                    );
4188                }
4189
4190                // Invariant 2: last shard is sentinel
4191                if !shards.is_empty() {
4192                    let last = shards.last().unwrap();
4193                    assert_eq!(
4194                        last.0.sharded_key.highest_block_number,
4195                        u64::MAX,
4196                        "Storage: last shard must be sentinel"
4197                    );
4198                }
4199            }
4200        }
4201    }
4202
4203    #[test]
4204    fn test_prune_account_history_batch_multiple_sorted_targets() {
4205        let temp_dir = TempDir::new().unwrap();
4206        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4207
4208        let addr1 = Address::from([0x01; 20]);
4209        let addr2 = Address::from([0x02; 20]);
4210        let addr3 = Address::from([0x03; 20]);
4211
4212        // Setup shards for each address
4213        let mut batch = provider.batch();
4214        batch
4215            .put::<tables::AccountsHistory>(
4216                ShardedKey::new(addr1, u64::MAX),
4217                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4218            )
4219            .unwrap();
4220        batch
4221            .put::<tables::AccountsHistory>(
4222                ShardedKey::new(addr2, u64::MAX),
4223                &BlockNumberList::new_pre_sorted([5, 10, 15]),
4224            )
4225            .unwrap();
4226        batch
4227            .put::<tables::AccountsHistory>(
4228                ShardedKey::new(addr3, u64::MAX),
4229                &BlockNumberList::new_pre_sorted([100, 200]),
4230            )
4231            .unwrap();
4232        batch.commit().unwrap();
4233
4234        // Prune all three (sorted by address)
4235        let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4236        targets.sort_by_key(|(addr, _)| *addr);
4237
4238        let mut batch = provider.batch();
4239        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4240        batch.commit().unwrap();
4241
4242        // addr1: prune <=15, keep [20, 30] -> updated
4243        // addr2: prune <=10, keep [15] -> updated
4244        // addr3: prune <=50, keep [100, 200] -> unchanged
4245        assert_eq!(outcomes.updated, 2);
4246        assert_eq!(outcomes.unchanged, 1);
4247
4248        let shards1 = provider.account_history_shards(addr1).unwrap();
4249        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4250
4251        let shards2 = provider.account_history_shards(addr2).unwrap();
4252        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4253
4254        let shards3 = provider.account_history_shards(addr3).unwrap();
4255        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4256    }
4257
4258    #[test]
4259    fn test_prune_account_history_batch_target_with_no_shards() {
4260        let temp_dir = TempDir::new().unwrap();
4261        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4262
4263        let addr1 = Address::from([0x01; 20]);
4264        let addr2 = Address::from([0x02; 20]); // No shards for this one
4265        let addr3 = Address::from([0x03; 20]);
4266
4267        // Only setup shards for addr1 and addr3
4268        let mut batch = provider.batch();
4269        batch
4270            .put::<tables::AccountsHistory>(
4271                ShardedKey::new(addr1, u64::MAX),
4272                &BlockNumberList::new_pre_sorted([10, 20]),
4273            )
4274            .unwrap();
4275        batch
4276            .put::<tables::AccountsHistory>(
4277                ShardedKey::new(addr3, u64::MAX),
4278                &BlockNumberList::new_pre_sorted([30, 40]),
4279            )
4280            .unwrap();
4281        batch.commit().unwrap();
4282
4283        // Prune all three (addr2 has no shards - tests p > target_prefix case)
4284        let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4285        targets.sort_by_key(|(addr, _)| *addr);
4286
4287        let mut batch = provider.batch();
4288        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4289        batch.commit().unwrap();
4290
4291        // addr1: updated (keep [20])
4292        // addr2: unchanged (no shards)
4293        // addr3: updated (keep [40])
4294        assert_eq!(outcomes.updated, 2);
4295        assert_eq!(outcomes.unchanged, 1);
4296
4297        let shards1 = provider.account_history_shards(addr1).unwrap();
4298        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4299
4300        let shards3 = provider.account_history_shards(addr3).unwrap();
4301        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4302    }
4303
4304    #[test]
4305    fn test_prune_storage_history_batch_multiple_sorted_targets() {
4306        let temp_dir = TempDir::new().unwrap();
4307        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4308
4309        let addr = Address::from([0x42; 20]);
4310        let slot1 = B256::from([0x01; 32]);
4311        let slot2 = B256::from([0x02; 32]);
4312
4313        // Setup shards
4314        let mut batch = provider.batch();
4315        batch
4316            .put::<tables::StoragesHistory>(
4317                StorageShardedKey::new(addr, slot1, u64::MAX),
4318                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4319            )
4320            .unwrap();
4321        batch
4322            .put::<tables::StoragesHistory>(
4323                StorageShardedKey::new(addr, slot2, u64::MAX),
4324                &BlockNumberList::new_pre_sorted([5, 15, 25]),
4325            )
4326            .unwrap();
4327        batch.commit().unwrap();
4328
4329        // Prune both (sorted)
4330        let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4331        targets.sort_by_key(|((a, s), _)| (*a, *s));
4332
4333        let mut batch = provider.batch();
4334        let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4335        batch.commit().unwrap();
4336
4337        assert_eq!(outcomes.updated, 2);
4338
4339        let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4340        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4341
4342        let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4343        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4344    }
4345}