Skip to main content

reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5    map::{AddressMap, HashMap},
6    Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use rayon::prelude::*;
12use reth_chain_state::ExecutedBlock;
13use reth_db_api::{
14    database_metrics::DatabaseMetrics,
15    models::{
16        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
17        StorageSettings,
18    },
19    table::{Compress, Decode, Decompress, Encode, Table},
20    tables, BlockNumberList, DatabaseError,
21};
22use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
23use reth_prune_types::PruneMode;
24use reth_storage_errors::{
25    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
26    provider::{ProviderError, ProviderResult},
27};
28use rocksdb::{
29    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
30    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
31    OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
32    WriteBatchWithTransaction, WriteBufferManager, WriteOptions, DB,
33};
34use std::{
35    collections::BTreeMap,
36    fmt,
37    path::{Path, PathBuf},
38    sync::Arc,
39};
40use tracing::instrument;
41
42/// Returns [`WriteOptions`] with WAL sync enabled for crash durability.
43fn synced_write_options() -> WriteOptions {
44    let mut opts = WriteOptions::default();
45    opts.set_sync(true);
46    opts
47}
48
49/// Pending `RocksDB` batches type alias.
50pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
51
52/// Raw key-value result from a `RocksDB` iterator.
53type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
54
55/// Statistics for a single `RocksDB` table (column family).
56#[derive(Debug, Clone)]
57pub struct RocksDBTableStats {
58    /// Size of SST files on disk in bytes.
59    pub sst_size_bytes: u64,
60    /// Size of memtables in memory in bytes.
61    pub memtable_size_bytes: u64,
62    /// Name of the table/column family.
63    pub name: String,
64    /// Estimated number of keys in the table.
65    pub estimated_num_keys: u64,
66    /// Estimated size of live data in bytes (SST files + memtables).
67    pub estimated_size_bytes: u64,
68    /// Estimated bytes pending compaction (reclaimable space).
69    pub pending_compaction_bytes: u64,
70}
71
72/// Database-level statistics for `RocksDB`.
73///
74/// Contains both per-table statistics and DB-level metrics like WAL size.
75#[derive(Debug, Clone)]
76pub struct RocksDBStats {
77    /// Statistics for each table (column family).
78    pub tables: Vec<RocksDBTableStats>,
79    /// Total size of WAL (Write-Ahead Log) files in bytes.
80    ///
81    /// WAL is shared across all tables and not included in per-table metrics.
82    pub wal_size_bytes: u64,
83}
84
85/// Context for `RocksDB` block writes.
86#[derive(Clone)]
87pub(crate) struct RocksDBWriteCtx {
88    /// The first block number being written.
89    pub first_block_number: BlockNumber,
90    /// The prune mode for transaction lookup, if any.
91    pub prune_tx_lookup: Option<PruneMode>,
92    /// Storage settings determining what goes to `RocksDB`.
93    pub storage_settings: StorageSettings,
94    /// Pending batches to push to after writing.
95    pub pending_batches: PendingRocksDBBatches,
96}
97
98impl fmt::Debug for RocksDBWriteCtx {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("RocksDBWriteCtx")
101            .field("first_block_number", &self.first_block_number)
102            .field("prune_tx_lookup", &self.prune_tx_lookup)
103            .field("storage_settings", &self.storage_settings)
104            .field("pending_batches", &"<pending batches>")
105            .finish()
106    }
107}
108
109/// Default cache size for `RocksDB` block cache (128 MB).
110const DEFAULT_CACHE_SIZE: usize = 128 << 20;
111
112/// Default block size for `RocksDB` tables (16 KB).
113const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
114
115/// Default max background jobs for `RocksDB` compaction and flushing.
116const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
117
118/// Default max open file descriptors for `RocksDB`.
119///
120/// Caps the number of SST file handles `RocksDB` keeps open simultaneously.
121/// Set to 512 to stay within the common default OS `ulimit -n` of 1024,
122/// leaving headroom for MDBX, static files, and other I/O.
123/// `RocksDB` uses an internal table cache and re-opens files on demand,
124/// so this has negligible performance impact on read-heavy workloads.
125const DEFAULT_MAX_OPEN_FILES: i32 = 512;
126
127/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
128const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
129
130/// Default write buffer size for `RocksDB` memtables (128 MB).
131///
132/// Larger memtables reduce flush frequency during burst writes, providing more consistent
133/// tail latency. Benchmarks showed 128 MB reduces p99 latency variance by ~80% compared
134/// to 64 MB default, with negligible impact on mean throughput.
135const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
136
137/// Default total `RocksDB` memtable memory budget across column families (4 GiB).
138///
139/// This is a soft limit; with write stalls enabled, `RocksDB` waits for flushes once
140/// memtable arena usage exceeds the budget.
141const DEFAULT_WRITE_BUFFER_MANAGER_SIZE: usize = 4 * 1024 * 1024 * 1024;
142
143/// Default buffer capacity for compression in batches.
144/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
145/// reducing the first few reallocations without over-allocating.
146const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
147
148/// Default auto-commit threshold for batch writes (512 MiB).
149///
150/// When a batch exceeds this size, it is automatically committed to prevent OOM
151/// during large bulk writes. Keep this below the `RocksDB` write buffer manager
152/// budget so stalls can recover without waiting on a single large flush.
153/// The consistency check on startup heals any crash that occurs between auto-commits.
154const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 512 * 1024 * 1024;
155
156/// Builder for [`RocksDBProvider`].
157pub struct RocksDBBuilder {
158    path: PathBuf,
159    column_families: Vec<String>,
160    enable_metrics: bool,
161    enable_statistics: bool,
162    log_level: rocksdb::LogLevel,
163    block_cache: Cache,
164    read_only: bool,
165}
166
167impl fmt::Debug for RocksDBBuilder {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        f.debug_struct("RocksDBBuilder")
170            .field("path", &self.path)
171            .field("column_families", &self.column_families)
172            .field("enable_metrics", &self.enable_metrics)
173            .finish()
174    }
175}
176
177impl RocksDBBuilder {
178    /// Creates a new builder with optimized default options.
179    pub fn new(path: impl AsRef<Path>) -> Self {
180        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
181        Self {
182            path: path.as_ref().to_path_buf(),
183            column_families: Vec::new(),
184            enable_metrics: false,
185            enable_statistics: false,
186            log_level: rocksdb::LogLevel::Info,
187            block_cache: cache,
188            read_only: false,
189        }
190    }
191
192    /// Creates default table options with shared block cache.
193    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
194        let mut table_options = BlockBasedOptions::default();
195        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
196        table_options.set_cache_index_and_filter_blocks(true);
197        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
198        // Shared block cache for all column families.
199        table_options.set_block_cache(cache);
200        table_options
201    }
202
203    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
204    fn default_options(
205        log_level: rocksdb::LogLevel,
206        cache: &Cache,
207        enable_statistics: bool,
208    ) -> Options {
209        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
210        let table_options = Self::default_table_options(cache);
211
212        let mut options = Options::default();
213        options.set_block_based_table_factory(&table_options);
214        options.create_if_missing(true);
215        options.create_missing_column_families(true);
216        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
217        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
218        let write_buffer_manager =
219            WriteBufferManager::new_write_buffer_manager(DEFAULT_WRITE_BUFFER_MANAGER_SIZE, true);
220        options.set_write_buffer_manager(&write_buffer_manager);
221
222        options.set_bottommost_compression_type(DBCompressionType::Zstd);
223        options.set_bottommost_zstd_max_train_bytes(0, true);
224        options.set_compression_type(DBCompressionType::Lz4);
225        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
226
227        options.set_log_level(log_level);
228
229        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
230
231        // Delete obsolete WAL files immediately after all column families have flushed.
232        // Both set to 0 means "delete ASAP, no archival".
233        options.set_wal_ttl_seconds(0);
234        options.set_wal_size_limit_mb(0);
235
236        // Statistics can view from RocksDB log file
237        if enable_statistics {
238            options.enable_statistics();
239        }
240
241        options
242    }
243
244    /// Creates optimized column family options.
245    fn default_column_family_options(cache: &Cache) -> Options {
246        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
247        let table_options = Self::default_table_options(cache);
248
249        let mut cf_options = Options::default();
250        cf_options.set_block_based_table_factory(&table_options);
251        cf_options.set_level_compaction_dynamic_level_bytes(true);
252        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
253        cf_options.set_compression_type(DBCompressionType::Lz4);
254        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
255        // Only use Zstd compression, disable dictionary training
256        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
257        cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
258
259        cf_options
260    }
261
262    /// Creates optimized column family options for `TransactionHashNumbers`.
263    ///
264    /// This table stores `B256 -> TxNumber` mappings where:
265    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
266    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
267    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
268    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
269        let mut table_options = BlockBasedOptions::default();
270        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
271        table_options.set_cache_index_and_filter_blocks(true);
272        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
273        table_options.set_block_cache(cache);
274        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
275        // and waste memory
276
277        let mut cf_options = Options::default();
278        cf_options.set_block_based_table_factory(&table_options);
279        cf_options.set_level_compaction_dynamic_level_bytes(true);
280        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
281        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
282        cf_options.set_compression_type(DBCompressionType::None);
283        cf_options.set_bottommost_compression_type(DBCompressionType::None);
284
285        cf_options
286    }
287
288    /// Adds a column family for a specific table type.
289    pub fn with_table<T: Table>(mut self) -> Self {
290        self.column_families.push(T::NAME.to_string());
291        self
292    }
293
294    /// Registers the default tables used by reth for `RocksDB` storage.
295    ///
296    /// This registers:
297    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
298    /// - [`tables::AccountsHistory`] - Account history index
299    /// - [`tables::StoragesHistory`] - Storage history index
300    pub fn with_default_tables(self) -> Self {
301        self.with_table::<tables::TransactionHashNumbers>()
302            .with_table::<tables::AccountsHistory>()
303            .with_table::<tables::StoragesHistory>()
304    }
305
306    /// Enables metrics.
307    pub const fn with_metrics(mut self) -> Self {
308        self.enable_metrics = true;
309        self
310    }
311
312    /// Enables `RocksDB` internal statistics collection.
313    pub const fn with_statistics(mut self) -> Self {
314        self.enable_statistics = true;
315        self
316    }
317
318    /// Sets the log level from `DatabaseArgs` configuration.
319    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
320        if let Some(level) = log_level {
321            self.log_level = convert_log_level(level);
322        }
323        self
324    }
325
326    /// Sets a custom block cache size.
327    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
328        self.block_cache = Cache::new_lru_cache(capacity_bytes);
329        self
330    }
331
332    /// Sets read-only mode.
333    ///
334    /// Opens the database as a secondary instance, which supports catching up
335    /// with the primary via [`RocksDBProvider::try_catch_up_with_primary`].
336    /// A temporary directory is created automatically for the secondary's LOG files.
337    ///
338    /// Note: Write operations on a read-only provider will panic at runtime.
339    pub const fn with_read_only(mut self, read_only: bool) -> Self {
340        self.read_only = read_only;
341        self
342    }
343
344    /// Builds the [`RocksDBProvider`].
345    pub fn build(self) -> ProviderResult<RocksDBProvider> {
346        let options =
347            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
348
349        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
350            .column_families
351            .iter()
352            .map(|name| {
353                let cf_options = if name == tables::TransactionHashNumbers::NAME {
354                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
355                } else {
356                    Self::default_column_family_options(&self.block_cache)
357                };
358                ColumnFamilyDescriptor::new(name.clone(), cf_options)
359            })
360            .collect();
361
362        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
363
364        if self.read_only {
365            // Open as secondary instance for catch-up capability.
366            // Secondary needs max_open_files = -1 to keep all FDs open.
367            let mut options = options;
368            options.set_max_open_files(-1);
369
370            let secondary_path = self
371                .path
372                .parent()
373                .unwrap_or(&self.path)
374                .join(format!("rocksdb-secondary-tmp-{}", std::process::id()));
375            reth_fs_util::create_dir_all(&secondary_path).map_err(ProviderError::other)?;
376
377            let db = DB::open_cf_descriptors_as_secondary(
378                &options,
379                &self.path,
380                &secondary_path,
381                cf_descriptors,
382            )
383            .map_err(|e| {
384                ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
385                    message: e.to_string().into(),
386                    code: -1,
387                }))
388            })?;
389            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::Secondary {
390                db,
391                metrics,
392                secondary_path,
393            })))
394        } else {
395            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
396            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
397            // detection at commit) and is backed by DBCommon, giving us access to
398            // cancel_all_background_work for clean shutdown.
399            let db =
400                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
401                    .map_err(|e| {
402                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
403                            message: e.to_string().into(),
404                            code: -1,
405                        }))
406                    })?;
407            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
408        }
409    }
410}
411
412/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
413/// allocated buffer when we can just use their reference.
414macro_rules! compress_to_buf_or_ref {
415    ($buf:expr, $value:expr) => {
416        if let Some(value) = $value.uncompressable_ref() {
417            Some(value)
418        } else {
419            $buf.clear();
420            $value.compress_to_buf(&mut $buf);
421            None
422        }
423    };
424}
425
426/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
427#[derive(Debug)]
428pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
429
430/// Inner state for `RocksDB` provider.
431enum RocksDBProviderInner {
432    /// Read-write mode using `OptimisticTransactionDB`.
433    ReadWrite {
434        /// `RocksDB` database instance with optimistic transaction support.
435        db: OptimisticTransactionDB,
436        /// Metrics latency & operations.
437        metrics: Option<RocksDBMetrics>,
438    },
439    /// Secondary mode using `DB` opened with `open_cf_descriptors_as_secondary`.
440    /// Supports catching up with the primary via `try_catch_up_with_primary`.
441    /// Does not support snapshots; consistency is guaranteed externally.
442    Secondary {
443        /// Secondary `RocksDB` database instance.
444        db: DB,
445        /// Metrics latency & operations.
446        metrics: Option<RocksDBMetrics>,
447        /// Temporary directory for secondary LOG files, removed on drop.
448        secondary_path: PathBuf,
449    },
450}
451
452impl RocksDBProviderInner {
453    /// Returns the metrics for this provider.
454    const fn metrics(&self) -> Option<&RocksDBMetrics> {
455        match self {
456            Self::ReadWrite { metrics, .. } | Self::Secondary { metrics, .. } => metrics.as_ref(),
457        }
458    }
459
460    /// Returns the read-write database, panicking if in read-only mode.
461    fn db_rw(&self) -> &OptimisticTransactionDB {
462        match self {
463            Self::ReadWrite { db, .. } => db,
464            Self::Secondary { .. } => {
465                panic!("Cannot perform write operation on secondary RocksDB provider")
466            }
467        }
468    }
469
470    /// Gets the column family handle for a table.
471    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
472        let cf = match self {
473            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
474            Self::Secondary { db, .. } => db.cf_handle(T::NAME),
475        };
476        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
477    }
478
479    /// Gets a value from a column family.
480    fn get_cf(
481        &self,
482        cf: &rocksdb::ColumnFamily,
483        key: impl AsRef<[u8]>,
484    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
485        match self {
486            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
487            Self::Secondary { db, .. } => db.get_cf(cf, key),
488        }
489    }
490
491    /// Puts a value into a column family.
492    fn put_cf(
493        &self,
494        cf: &rocksdb::ColumnFamily,
495        key: impl AsRef<[u8]>,
496        value: impl AsRef<[u8]>,
497    ) -> Result<(), rocksdb::Error> {
498        self.db_rw().put_cf(cf, key, value)
499    }
500
501    /// Deletes a value from a column family.
502    fn delete_cf(
503        &self,
504        cf: &rocksdb::ColumnFamily,
505        key: impl AsRef<[u8]>,
506    ) -> Result<(), rocksdb::Error> {
507        self.db_rw().delete_cf(cf, key)
508    }
509
510    /// Deletes a range of values from a column family.
511    fn delete_range_cf<K: AsRef<[u8]>>(
512        &self,
513        cf: &rocksdb::ColumnFamily,
514        from: K,
515        to: K,
516    ) -> Result<(), rocksdb::Error> {
517        self.db_rw().delete_range_cf(cf, from, to)
518    }
519
520    /// Returns an iterator over a column family.
521    fn iterator_cf(
522        &self,
523        cf: &rocksdb::ColumnFamily,
524        mode: IteratorMode<'_>,
525    ) -> RocksDBIterEnum<'_> {
526        match self {
527            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
528            Self::Secondary { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
529        }
530    }
531
532    /// Returns a raw iterator over a column family.
533    ///
534    /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
535    /// repositioning without creating a new iterator.
536    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
537        match self {
538            Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
539            Self::Secondary { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
540        }
541    }
542
543    /// Returns a read-only, point-in-time snapshot of the database.
544    fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
545        match self {
546            Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
547            Self::Secondary { db, .. } => RocksReadSnapshotInner::Secondary(db),
548        }
549    }
550
551    /// Returns the path to the database directory.
552    fn path(&self) -> &Path {
553        match self {
554            Self::ReadWrite { db, .. } => db.path(),
555            Self::Secondary { db, .. } => db.path(),
556        }
557    }
558
559    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
560    ///
561    /// WAL files have a `.log` extension in the `RocksDB` directory.
562    fn wal_size_bytes(&self) -> u64 {
563        let path = self.path();
564
565        match std::fs::read_dir(path) {
566            Ok(entries) => entries
567                .filter_map(|e| e.ok())
568                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
569                .filter_map(|e| e.metadata().ok())
570                .map(|m| m.len())
571                .sum(),
572            Err(_) => 0,
573        }
574    }
575
576    /// Returns statistics for all column families in the database.
577    fn table_stats(&self) -> Vec<RocksDBTableStats> {
578        let mut stats = Vec::new();
579
580        macro_rules! collect_stats {
581            ($db:expr) => {
582                for cf_name in ROCKSDB_TABLES {
583                    if let Some(cf) = $db.cf_handle(cf_name) {
584                        let estimated_num_keys = $db
585                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
586                            .ok()
587                            .flatten()
588                            .unwrap_or(0);
589
590                        // SST files size (on-disk) + memtable size (in-memory)
591                        let sst_size = $db
592                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
593                            .ok()
594                            .flatten()
595                            .unwrap_or(0);
596
597                        let memtable_size = $db
598                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
599                            .ok()
600                            .flatten()
601                            .unwrap_or(0);
602
603                        let estimated_size_bytes = sst_size + memtable_size;
604
605                        let pending_compaction_bytes = $db
606                            .property_int_value_cf(
607                                cf,
608                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
609                            )
610                            .ok()
611                            .flatten()
612                            .unwrap_or(0);
613
614                        stats.push(RocksDBTableStats {
615                            sst_size_bytes: sst_size,
616                            memtable_size_bytes: memtable_size,
617                            name: cf_name.to_string(),
618                            estimated_num_keys,
619                            estimated_size_bytes,
620                            pending_compaction_bytes,
621                        });
622                    }
623                }
624            };
625        }
626
627        match self {
628            Self::ReadWrite { db, .. } => collect_stats!(db),
629            Self::Secondary { db, .. } => collect_stats!(db),
630        }
631
632        stats
633    }
634
635    /// Returns database-level statistics including per-table stats and WAL size.
636    fn db_stats(&self) -> RocksDBStats {
637        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
638    }
639}
640
641impl fmt::Debug for RocksDBProviderInner {
642    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643        match self {
644            Self::ReadWrite { metrics, .. } => f
645                .debug_struct("RocksDBProviderInner::ReadWrite")
646                .field("db", &"<OptimisticTransactionDB>")
647                .field("metrics", metrics)
648                .finish(),
649            Self::Secondary { metrics, .. } => f
650                .debug_struct("RocksDBProviderInner::Secondary")
651                .field("db", &"<DB (secondary)>")
652                .field("metrics", metrics)
653                .finish(),
654        }
655    }
656}
657
658impl Drop for RocksDBProviderInner {
659    fn drop(&mut self) {
660        match self {
661            Self::ReadWrite { db, .. } => {
662                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
663                // restart
664                if let Err(e) = db.flush_wal(true) {
665                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
666                }
667                for cf_name in ROCKSDB_TABLES {
668                    if let Some(cf) = db.cf_handle(cf_name) &&
669                        let Err(e) = db.flush_cf(&cf)
670                    {
671                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
672                    }
673                }
674                db.cancel_all_background_work(true);
675            }
676            Self::Secondary { db, secondary_path, .. } => {
677                db.cancel_all_background_work(true);
678                let _ = std::fs::remove_dir_all(secondary_path);
679            }
680        }
681    }
682}
683
684impl Clone for RocksDBProvider {
685    fn clone(&self) -> Self {
686        Self(self.0.clone())
687    }
688}
689
690impl DatabaseMetrics for RocksDBProvider {
691    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
692        let mut metrics = Vec::new();
693
694        for stat in self.table_stats() {
695            metrics.push((
696                "rocksdb.table_size",
697                stat.estimated_size_bytes as f64,
698                vec![Label::new("table", stat.name.clone())],
699            ));
700            metrics.push((
701                "rocksdb.table_entries",
702                stat.estimated_num_keys as f64,
703                vec![Label::new("table", stat.name.clone())],
704            ));
705            metrics.push((
706                "rocksdb.pending_compaction_bytes",
707                stat.pending_compaction_bytes as f64,
708                vec![Label::new("table", stat.name.clone())],
709            ));
710            metrics.push((
711                "rocksdb.sst_size",
712                stat.sst_size_bytes as f64,
713                vec![Label::new("table", stat.name.clone())],
714            ));
715            metrics.push((
716                "rocksdb.memtable_size",
717                stat.memtable_size_bytes as f64,
718                vec![Label::new("table", stat.name)],
719            ));
720        }
721
722        // WAL size (DB-level, shared across all tables)
723        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
724
725        metrics
726    }
727}
728
729impl RocksDBProvider {
730    /// Creates a new `RocksDB` provider.
731    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
732        RocksDBBuilder::new(path).build()
733    }
734
735    /// Creates a new `RocksDB` provider builder.
736    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
737        RocksDBBuilder::new(path)
738    }
739
740    /// Returns `true` if a `RocksDB` database exists at the given path.
741    ///
742    /// Checks for the presence of the `CURRENT` file, which `RocksDB` creates
743    /// when initializing a database.
744    pub fn exists(path: impl AsRef<Path>) -> bool {
745        path.as_ref().join("CURRENT").exists()
746    }
747
748    /// Returns `true` if this provider is in read-only mode.
749    pub fn is_read_only(&self) -> bool {
750        matches!(self.0.as_ref(), RocksDBProviderInner::Secondary { .. })
751    }
752
753    /// Tries to catch up with the primary instance by reading new WAL and MANIFEST entries.
754    ///
755    /// This is a no-op for read-write and read-only providers.
756    /// For secondary providers, this incrementally syncs with the primary's latest state.
757    pub fn try_catch_up_with_primary(&self) -> ProviderResult<()> {
758        match self.0.as_ref() {
759            RocksDBProviderInner::Secondary { db, .. } => {
760                db.try_catch_up_with_primary().map_err(|e| {
761                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
762                        message: e.to_string().into(),
763                        code: -1,
764                    }))
765                })
766            }
767            _ => Ok(()),
768        }
769    }
770
771    /// Returns a read-only, point-in-time snapshot of the database.
772    ///
773    /// Lighter weight than [`RocksTx`] — no write-conflict tracking, and `Send + Sync`.
774    pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
775        RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
776    }
777
778    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
779    ///
780    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
781    /// Conflict detection happens at commit time, not at write time.
782    ///
783    /// # Panics
784    /// Panics if the provider is in read-only mode.
785    pub fn tx(&self) -> RocksTx<'_> {
786        let write_options = synced_write_options();
787        let txn_options = OptimisticTransactionOptions::default();
788        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
789        RocksTx { inner, provider: self }
790    }
791
792    /// Creates a new batch for atomic writes.
793    ///
794    /// Use [`Self::write_batch`] for closure-based atomic writes.
795    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
796    ///
797    /// # Panics
798    /// Panics if the provider is in read-only mode when attempting to commit.
799    pub fn batch(&self) -> RocksDBBatch<'_> {
800        RocksDBBatch {
801            provider: self,
802            inner: WriteBatchWithTransaction::<true>::default(),
803            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
804            auto_commit_threshold: None,
805        }
806    }
807
808    /// Creates a new batch with auto-commit enabled.
809    ///
810    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
811    /// committed and reset. This prevents OOM during large bulk writes while maintaining
812    /// crash-safety via the consistency check on startup.
813    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
814        RocksDBBatch {
815            provider: self,
816            inner: WriteBatchWithTransaction::<true>::default(),
817            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
818            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
819        }
820    }
821
822    /// Gets the column family handle for a table.
823    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
824        self.0.cf_handle::<T>()
825    }
826
827    /// Executes a function and records metrics with the given operation and table name.
828    fn execute_with_operation_metric<R>(
829        &self,
830        operation: RocksDBOperation,
831        table: &'static str,
832        f: impl FnOnce(&Self) -> R,
833    ) -> R {
834        let start = self.0.metrics().map(|_| Instant::now());
835        let res = f(self);
836
837        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
838            metrics.record_operation(operation, table, start.elapsed());
839        }
840
841        res
842    }
843
844    /// Gets a value from the specified table.
845    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
846        self.get_encoded::<T>(&key.encode())
847    }
848
849    /// Gets a value from the specified table using pre-encoded key.
850    pub fn get_encoded<T: Table>(
851        &self,
852        key: &<T::Key as Encode>::Encoded,
853    ) -> ProviderResult<Option<T::Value>> {
854        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
855            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
856                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
857                    message: e.to_string().into(),
858                    code: -1,
859                }))
860            })?;
861
862            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
863        })
864    }
865
866    /// Gets raw bytes from the specified table without decompressing.
867    pub fn get_raw<T: Table>(&self, key: T::Key) -> ProviderResult<Option<Vec<u8>>> {
868        let encoded = key.encode();
869        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
870            this.0.get_cf(this.get_cf_handle::<T>()?, encoded.as_ref()).map_err(|e| {
871                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
872                    message: e.to_string().into(),
873                    code: -1,
874                }))
875            })
876        })
877    }
878
879    /// Puts upsert a value into the specified table with the given key.
880    ///
881    /// # Panics
882    /// Panics if the provider is in read-only mode.
883    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
884        let encoded_key = key.encode();
885        self.put_encoded::<T>(&encoded_key, value)
886    }
887
888    /// Puts a value into the specified table using pre-encoded key.
889    ///
890    /// # Panics
891    /// Panics if the provider is in read-only mode.
892    pub fn put_encoded<T: Table>(
893        &self,
894        key: &<T::Key as Encode>::Encoded,
895        value: &T::Value,
896    ) -> ProviderResult<()> {
897        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
898            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
899            // is thread safe if user want to avoid allocate buf each time, they can use
900            // write_batch api
901            let mut buf = Vec::new();
902            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
903
904            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
905                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
906                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
907                    operation: DatabaseWriteOperation::PutUpsert,
908                    table_name: T::NAME,
909                    key: key.as_ref().to_vec(),
910                })))
911            })
912        })
913    }
914
915    /// Deletes a value from the specified table.
916    ///
917    /// # Panics
918    /// Panics if the provider is in read-only mode.
919    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
920        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
921            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
922                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
923                    message: e.to_string().into(),
924                    code: -1,
925                }))
926            })
927        })
928    }
929
930    /// Clears all entries from the specified table.
931    ///
932    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
933    /// This end key must exceed the maximum encoded key size for any table.
934    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
935    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
936        let cf = self.get_cf_handle::<T>()?;
937
938        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
939            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
940                message: e.to_string().into(),
941                code: -1,
942            }))
943        })?;
944
945        Ok(())
946    }
947
948    /// Retrieves the first or last entry from a table based on the iterator mode.
949    fn get_boundary<T: Table>(
950        &self,
951        mode: IteratorMode<'_>,
952    ) -> ProviderResult<Option<(T::Key, T::Value)>> {
953        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
954            let cf = this.get_cf_handle::<T>()?;
955            let mut iter = this.0.iterator_cf(cf, mode);
956
957            match iter.next() {
958                Some(Ok((key_bytes, value_bytes))) => {
959                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
960                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
961                    let value = T::Value::decompress(&value_bytes)
962                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
963                    Ok(Some((key, value)))
964                }
965                Some(Err(e)) => {
966                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
967                        message: e.to_string().into(),
968                        code: -1,
969                    })))
970                }
971                None => Ok(None),
972            }
973        })
974    }
975
976    /// Gets the first (smallest key) entry from the specified table.
977    #[inline]
978    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
979        self.get_boundary::<T>(IteratorMode::Start)
980    }
981
982    /// Gets the last (largest key) entry from the specified table.
983    #[inline]
984    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
985        self.get_boundary::<T>(IteratorMode::End)
986    }
987
988    /// Creates an iterator over all entries in the specified table.
989    ///
990    /// Returns decoded `(Key, Value)` pairs in key order.
991    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
992        let cf = self.get_cf_handle::<T>()?;
993        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
994        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
995    }
996
997    /// Creates an iterator starting from the given key (inclusive, seek forward).
998    ///
999    /// Returns decoded `(Key, Value)` pairs starting from the first key >= `key`.
1000    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksDBIter<'_, T>> {
1001        let cf = self.get_cf_handle::<T>()?;
1002        let encoded_key = key.encode();
1003        let iter = self
1004            .0
1005            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
1006        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
1007    }
1008
1009    /// Returns statistics for all column families in the database.
1010    ///
1011    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
1012    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
1013        self.0.table_stats()
1014    }
1015
1016    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
1017    ///
1018    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
1019    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
1020    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
1021    pub fn wal_size_bytes(&self) -> u64 {
1022        self.0.wal_size_bytes()
1023    }
1024
1025    /// Returns database-level statistics including per-table stats and WAL size.
1026    ///
1027    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
1028    pub fn db_stats(&self) -> RocksDBStats {
1029        self.0.db_stats()
1030    }
1031
1032    /// Flushes pending writes for the specified tables to disk.
1033    ///
1034    /// This performs a flush of:
1035    /// 1. The column family memtables for the specified table names to SST files
1036    /// 2. The Write-Ahead Log (WAL) with sync
1037    ///
1038    /// After this call completes, all data for the specified tables is durably persisted to disk.
1039    ///
1040    /// # Panics
1041    /// Panics if the provider is in read-only mode.
1042    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
1043    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
1044        let db = self.0.db_rw();
1045
1046        for cf_name in tables {
1047            if let Some(cf) = db.cf_handle(cf_name) {
1048                db.flush_cf(&cf).map_err(|e| {
1049                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1050                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1051                        operation: DatabaseWriteOperation::Flush,
1052                        table_name: cf_name,
1053                        key: Vec::new(),
1054                    })))
1055                })?;
1056            }
1057        }
1058
1059        db.flush_wal(true).map_err(|e| {
1060            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1061                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1062                operation: DatabaseWriteOperation::Flush,
1063                table_name: "WAL",
1064                key: Vec::new(),
1065            })))
1066        })?;
1067
1068        Ok(())
1069    }
1070
1071    /// Flushes and compacts all tables in `RocksDB`.
1072    ///
1073    /// This:
1074    /// 1. Flushes all column family memtables to SST files
1075    /// 2. Flushes the Write-Ahead Log (WAL) with sync
1076    /// 3. Triggers manual compaction on all column families to reclaim disk space
1077    ///
1078    /// Use this after large delete operations (like pruning) to reclaim disk space.
1079    ///
1080    /// # Panics
1081    /// Panics if the provider is in read-only mode.
1082    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1083    pub fn flush_and_compact(&self) -> ProviderResult<()> {
1084        self.flush(ROCKSDB_TABLES)?;
1085
1086        let db = self.0.db_rw();
1087
1088        for cf_name in ROCKSDB_TABLES {
1089            if let Some(cf) = db.cf_handle(cf_name) {
1090                db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
1091            }
1092        }
1093
1094        Ok(())
1095    }
1096
1097    /// Creates a raw iterator over all entries in the specified table.
1098    ///
1099    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
1100    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1101        let cf = self.get_cf_handle::<T>()?;
1102        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1103        Ok(RocksDBRawIter { inner: iter })
1104    }
1105
1106    /// Returns all account history shards for the given address in ascending key order.
1107    ///
1108    /// This is used for unwind operations where we need to scan all shards for an address
1109    /// and potentially delete or truncate them.
1110    pub fn account_history_shards(
1111        &self,
1112        address: Address,
1113    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1114        // Get the column family handle for the AccountsHistory table.
1115        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1116
1117        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
1118        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
1119        let start_key = ShardedKey::new(address, 0u64);
1120        let start_bytes = start_key.encode();
1121
1122        // Create a forward iterator starting from our seek position.
1123        let iter = self
1124            .0
1125            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1126
1127        let mut result = Vec::new();
1128        for item in iter {
1129            match item {
1130                Ok((key_bytes, value_bytes)) => {
1131                    // Decode the sharded key to check if we're still on the same address.
1132                    let key = ShardedKey::<Address>::decode(&key_bytes)
1133                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1134
1135                    // Stop when we reach a different address (keys are sorted by address first).
1136                    if key.key != address {
1137                        break;
1138                    }
1139
1140                    // Decompress the block number list stored in this shard.
1141                    let value = BlockNumberList::decompress(&value_bytes)
1142                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1143
1144                    result.push((key, value));
1145                }
1146                Err(e) => {
1147                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1148                        message: e.to_string().into(),
1149                        code: -1,
1150                    })));
1151                }
1152            }
1153        }
1154
1155        Ok(result)
1156    }
1157
1158    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1159    ///
1160    /// Iterates through all shards in ascending `highest_block_number` order until
1161    /// a different `(address, storage_key)` is encountered.
1162    pub fn storage_history_shards(
1163        &self,
1164        address: Address,
1165        storage_key: B256,
1166    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1167        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1168
1169        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1170        let start_bytes = start_key.encode();
1171
1172        let iter = self
1173            .0
1174            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1175
1176        let mut result = Vec::new();
1177        for item in iter {
1178            match item {
1179                Ok((key_bytes, value_bytes)) => {
1180                    let key = StorageShardedKey::decode(&key_bytes)
1181                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1182
1183                    if key.address != address || key.sharded_key.key != storage_key {
1184                        break;
1185                    }
1186
1187                    let value = BlockNumberList::decompress(&value_bytes)
1188                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1189
1190                    result.push((key, value));
1191                }
1192                Err(e) => {
1193                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1194                        message: e.to_string().into(),
1195                        code: -1,
1196                    })));
1197                }
1198            }
1199        }
1200
1201        Ok(result)
1202    }
1203
1204    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1205    ///
1206    /// Groups addresses by their minimum block number and calls the appropriate unwind
1207    /// operations. For each address, keeps only blocks less than the minimum block
1208    /// (i.e., removes the minimum block and all higher blocks).
1209    ///
1210    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1211    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1212    pub fn unwind_account_history_indices(
1213        &self,
1214        last_indices: &[(Address, BlockNumber)],
1215    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1216        let mut address_min_block: AddressMap<BlockNumber> =
1217            AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1218        for &(address, block_number) in last_indices {
1219            address_min_block
1220                .entry(address)
1221                .and_modify(|min| *min = (*min).min(block_number))
1222                .or_insert(block_number);
1223        }
1224
1225        let mut batch = self.batch();
1226        for (address, min_block) in address_min_block {
1227            match min_block.checked_sub(1) {
1228                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1229                None => batch.clear_account_history(address)?,
1230            }
1231        }
1232
1233        Ok(batch.into_inner())
1234    }
1235
1236    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1237    ///
1238    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1239    /// For each key, keeps only blocks less than the minimum block
1240    /// (i.e., removes the minimum block and all higher blocks).
1241    ///
1242    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1243    pub fn unwind_storage_history_indices(
1244        &self,
1245        storage_changesets: &[(Address, B256, BlockNumber)],
1246    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1247        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1248            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1249        for &(address, storage_key, block_number) in storage_changesets {
1250            key_min_block
1251                .entry((address, storage_key))
1252                .and_modify(|min| *min = (*min).min(block_number))
1253                .or_insert(block_number);
1254        }
1255
1256        let mut batch = self.batch();
1257        for ((address, storage_key), min_block) in key_min_block {
1258            match min_block.checked_sub(1) {
1259                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1260                None => batch.clear_storage_history(address, storage_key)?,
1261            }
1262        }
1263
1264        Ok(batch.into_inner())
1265    }
1266
1267    /// Writes a batch of operations atomically.
1268    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1269    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1270    where
1271        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1272    {
1273        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1274            let mut batch_handle = this.batch();
1275            f(&mut batch_handle)?;
1276            batch_handle.commit()
1277        })
1278    }
1279
1280    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1281    ///
1282    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1283    /// and needs to be committed at a later point (e.g., at provider commit time).
1284    ///
1285    /// # Panics
1286    /// Panics if the provider is in read-only mode.
1287    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1288    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1289        self.0.db_rw().write_opt(batch, &synced_write_options()).map_err(|e| {
1290            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1291                message: e.to_string().into(),
1292                code: -1,
1293            }))
1294        })
1295    }
1296
1297    /// Writes all `RocksDB` data for multiple blocks in parallel.
1298    ///
1299    /// This handles transaction hash numbers, account history, and storage history based on
1300    /// the provided storage settings. Each operation runs in parallel with its own batch,
1301    /// pushing to `ctx.pending_batches` for later commit.
1302    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1303    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1304        &self,
1305        blocks: &[ExecutedBlock<N>],
1306        tx_nums: &[TxNumber],
1307        ctx: RocksDBWriteCtx,
1308        runtime: &reth_tasks::Runtime,
1309    ) -> ProviderResult<()> {
1310        if !ctx.storage_settings.storage_v2 {
1311            return Ok(());
1312        }
1313
1314        let mut r_tx_hash = None;
1315        let mut r_account_history = None;
1316        let mut r_storage_history = None;
1317
1318        let write_tx_hash =
1319            ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1320        let write_account_history = ctx.storage_settings.storage_v2;
1321        let write_storage_history = ctx.storage_settings.storage_v2;
1322
1323        // Propagate tracing context into rayon-spawned threads so that RocksDB
1324        // write spans appear as children of write_blocks_data in traces.
1325        let span = tracing::Span::current();
1326        runtime.storage_pool().in_place_scope(|s| {
1327            if write_tx_hash {
1328                s.spawn(|_| {
1329                    let _guard = span.enter();
1330                    r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1331                });
1332            }
1333
1334            if write_account_history {
1335                s.spawn(|_| {
1336                    let _guard = span.enter();
1337                    r_account_history = Some(self.write_account_history(blocks, &ctx));
1338                });
1339            }
1340
1341            if write_storage_history {
1342                s.spawn(|_| {
1343                    let _guard = span.enter();
1344                    r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1345                });
1346            }
1347        });
1348
1349        if write_tx_hash {
1350            r_tx_hash.ok_or_else(|| {
1351                ProviderError::Database(DatabaseError::Other(
1352                    "rocksdb tx-hash write thread panicked".into(),
1353                ))
1354            })??;
1355        }
1356        if write_account_history {
1357            r_account_history.ok_or_else(|| {
1358                ProviderError::Database(DatabaseError::Other(
1359                    "rocksdb account-history write thread panicked".into(),
1360                ))
1361            })??;
1362        }
1363        if write_storage_history {
1364            r_storage_history.ok_or_else(|| {
1365                ProviderError::Database(DatabaseError::Other(
1366                    "rocksdb storage-history write thread panicked".into(),
1367                ))
1368            })??;
1369        }
1370
1371        Ok(())
1372    }
1373
1374    /// Writes transaction hash to number mappings for the given blocks.
1375    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1376    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1377        &self,
1378        blocks: &[ExecutedBlock<N>],
1379        tx_nums: &[TxNumber],
1380        ctx: &RocksDBWriteCtx,
1381    ) -> ProviderResult<()> {
1382        let mut batch = self.batch();
1383        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1384            let body = block.recovered_block().body();
1385            for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1386                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1387            }
1388        }
1389        ctx.pending_batches.lock().push(batch.into_inner());
1390        Ok(())
1391    }
1392
1393    /// Writes account history indices for the given blocks.
1394    ///
1395    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1396    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1397    fn write_account_history<N: reth_node_types::NodePrimitives>(
1398        &self,
1399        blocks: &[ExecutedBlock<N>],
1400        ctx: &RocksDBWriteCtx,
1401    ) -> ProviderResult<()> {
1402        let mut batch = self.batch();
1403        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1404
1405        for (block_idx, block) in blocks.iter().enumerate() {
1406            let block_number = ctx.first_block_number + block_idx as u64;
1407            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1408
1409            // Iterate through account reverts - these are exactly the accounts that have
1410            // changesets written, ensuring history indices match changeset entries.
1411            for account_block_reverts in reverts.accounts {
1412                for (address, _) in account_block_reverts {
1413                    account_history.entry(address).or_default().push(block_number);
1414                }
1415            }
1416        }
1417
1418        // Write account history using proper shard append logic
1419        for (address, indices) in account_history {
1420            batch.append_account_history_shard(address, indices)?;
1421        }
1422        ctx.pending_batches.lock().push(batch.into_inner());
1423        Ok(())
1424    }
1425
1426    /// Writes storage history indices for the given blocks.
1427    ///
1428    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1429    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1430    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1431        &self,
1432        blocks: &[ExecutedBlock<N>],
1433        ctx: &RocksDBWriteCtx,
1434    ) -> ProviderResult<()> {
1435        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1436
1437        for (block_idx, block) in blocks.iter().enumerate() {
1438            let block_number = ctx.first_block_number + block_idx as u64;
1439            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1440
1441            // Iterate through storage reverts - these are exactly the slots that have
1442            // changesets written, ensuring history indices match changeset entries.
1443            for storage_block_reverts in reverts.storage {
1444                for revert in storage_block_reverts {
1445                    for (slot, _) in revert.storage_revert {
1446                        let plain_key = B256::new(slot.to_be_bytes());
1447                        storage_history
1448                            .entry((revert.address, plain_key))
1449                            .or_default()
1450                            .push(block_number);
1451                    }
1452                }
1453            }
1454        }
1455
1456        let shard_puts = storage_history
1457            .into_par_iter()
1458            .map(|((address, slot), indices)| {
1459                self.storage_history_shards_to_put(address, slot, indices)
1460            })
1461            .collect::<ProviderResult<Vec<_>>>()?;
1462
1463        let mut batch = self.batch();
1464        for shards in shard_puts {
1465            for (key, shard) in shards {
1466                batch.put::<tables::StoragesHistory>(key, &shard)?;
1467            }
1468        }
1469        ctx.pending_batches.lock().push(batch.into_inner());
1470        Ok(())
1471    }
1472
1473    /// Prepares storage history shard writes by reading the current last shard and appending
1474    /// indices.
1475    fn storage_history_shards_to_put(
1476        &self,
1477        address: Address,
1478        storage_key: B256,
1479        indices: Vec<u64>,
1480    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1481        if indices.is_empty() {
1482            return Ok(Vec::new());
1483        }
1484
1485        debug_assert!(
1486            indices.windows(2).all(|w| w[0] < w[1]),
1487            "indices must be strictly increasing: {:?}",
1488            indices
1489        );
1490
1491        let last_key = StorageShardedKey::last(address, storage_key);
1492        let last_shard_opt = self.get::<tables::StoragesHistory>(last_key.clone())?;
1493        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1494
1495        last_shard.append(indices).map_err(ProviderError::other)?;
1496
1497        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1498            return Ok(vec![(last_key, last_shard)]);
1499        }
1500
1501        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1502        let mut chunks_peekable = chunks.into_iter().peekable();
1503        let mut shards = Vec::new();
1504
1505        while let Some(chunk) = chunks_peekable.next() {
1506            let shard = BlockNumberList::new_pre_sorted(chunk);
1507            let highest_block_number = if chunks_peekable.peek().is_some() {
1508                shard.iter().next_back().expect("`chunks` does not return empty list")
1509            } else {
1510                u64::MAX
1511            };
1512
1513            shards
1514                .push((StorageShardedKey::new(address, storage_key, highest_block_number), shard));
1515        }
1516
1517        Ok(shards)
1518    }
1519}
1520
1521/// A point-in-time read snapshot of the `RocksDB` database.
1522///
1523/// All reads through this snapshot see a consistent view of the database at the point
1524/// the snapshot was created, regardless of concurrent writes. This is the primary reader
1525/// used by [`EitherReader::RocksDB`](crate::either_writer::EitherReader) for history lookups.
1526///
1527/// Lighter weight than [`RocksTx`] — no transaction overhead, no write support.
1528pub struct RocksReadSnapshot<'db> {
1529    inner: RocksReadSnapshotInner<'db>,
1530    provider: &'db RocksDBProvider,
1531}
1532
1533/// Inner enum to hold the snapshot for either read-write or secondary mode.
1534enum RocksReadSnapshotInner<'db> {
1535    /// Snapshot from read-write `OptimisticTransactionDB`.
1536    ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1537    /// Direct reads from a secondary `DB` instance (no snapshot).
1538    Secondary(&'db DB),
1539}
1540
1541impl<'db> RocksReadSnapshotInner<'db> {
1542    /// Returns a raw iterator over a column family.
1543    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1544        match self {
1545            Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1546            Self::Secondary(db) => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
1547        }
1548    }
1549}
1550
1551impl fmt::Debug for RocksReadSnapshot<'_> {
1552    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1553        f.debug_struct("RocksReadSnapshot")
1554            .field("provider", &self.provider)
1555            .finish_non_exhaustive()
1556    }
1557}
1558
1559impl<'db> RocksReadSnapshot<'db> {
1560    /// Gets the column family handle for a table.
1561    fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1562        self.provider.get_cf_handle::<T>()
1563    }
1564
1565    /// Gets a value from the specified table.
1566    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1567        let encoded_key = key.encode();
1568        let cf = self.cf_handle::<T>()?;
1569        let result = match &self.inner {
1570            RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1571            RocksReadSnapshotInner::Secondary(db) => db.get_cf(cf, encoded_key.as_ref()),
1572        }
1573        .map_err(|e| {
1574            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1575                message: e.to_string().into(),
1576                code: -1,
1577            }))
1578        })?;
1579
1580        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1581    }
1582
1583    /// Lookup account history and return [`HistoryInfo`] directly.
1584    ///
1585    /// `visible_tip` is the highest block considered visible from the companion MDBX snapshot.
1586    /// History entries above it are ignored even if they already exist in `RocksDB`.
1587    pub fn account_history_info(
1588        &self,
1589        address: Address,
1590        block_number: BlockNumber,
1591        lowest_available_block_number: Option<BlockNumber>,
1592        visible_tip: BlockNumber,
1593    ) -> ProviderResult<HistoryInfo> {
1594        let key = ShardedKey::new(address, block_number);
1595        self.history_info::<tables::AccountsHistory>(
1596            key.encode().as_ref(),
1597            block_number,
1598            lowest_available_block_number,
1599            visible_tip,
1600            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1601            |prev_bytes| {
1602                <ShardedKey<Address> as Decode>::decode(prev_bytes)
1603                    .map(|k| k.key == address)
1604                    .unwrap_or(false)
1605            },
1606        )
1607    }
1608
1609    /// Lookup storage history and return [`HistoryInfo`] directly.
1610    ///
1611    /// `visible_tip` is the highest block considered visible from the companion MDBX snapshot.
1612    /// History entries above it are ignored even if they already exist in `RocksDB`.
1613    pub fn storage_history_info(
1614        &self,
1615        address: Address,
1616        storage_key: B256,
1617        block_number: BlockNumber,
1618        lowest_available_block_number: Option<BlockNumber>,
1619        visible_tip: BlockNumber,
1620    ) -> ProviderResult<HistoryInfo> {
1621        let key = StorageShardedKey::new(address, storage_key, block_number);
1622        self.history_info::<tables::StoragesHistory>(
1623            key.encode().as_ref(),
1624            block_number,
1625            lowest_available_block_number,
1626            visible_tip,
1627            |key_bytes| {
1628                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1629                Ok(k.address == address && k.sharded_key.key == storage_key)
1630            },
1631            |prev_bytes| {
1632                <StorageShardedKey as Decode>::decode(prev_bytes)
1633                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
1634                    .unwrap_or(false)
1635            },
1636        )
1637    }
1638
1639    /// Generic history lookup using the snapshot's raw iterator.
1640    ///
1641    /// The result is derived from the history that is visible through `visible_tip`, not from the
1642    /// full contents of `RocksDB`. This lets a reader combine an older MDBX snapshot with a newer
1643    /// Rocks snapshot without routing through history entries that MDBX cannot see yet.
1644    fn history_info<T>(
1645        &self,
1646        encoded_key: &[u8],
1647        block_number: BlockNumber,
1648        lowest_available_block_number: Option<BlockNumber>,
1649        visible_tip: BlockNumber,
1650        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1651        prev_key_matches: impl Fn(&[u8]) -> bool,
1652    ) -> ProviderResult<HistoryInfo>
1653    where
1654        T: Table<Value = BlockNumberList>,
1655    {
1656        let is_maybe_pruned = lowest_available_block_number.is_some();
1657        let fallback = || {
1658            Ok(if is_maybe_pruned {
1659                HistoryInfo::MaybeInPlainState
1660            } else {
1661                HistoryInfo::NotYetWritten
1662            })
1663        };
1664
1665        let cf = self.cf_handle::<T>()?;
1666        let mut iter = self.inner.raw_iterator_cf(cf);
1667
1668        iter.seek(encoded_key);
1669        iter.status().map_err(|e| {
1670            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1671                message: e.to_string().into(),
1672                code: -1,
1673            }))
1674        })?;
1675
1676        if !iter.valid() {
1677            return fallback();
1678        }
1679
1680        let Some(key_bytes) = iter.key() else {
1681            return fallback();
1682        };
1683        if !key_matches(key_bytes)? {
1684            return fallback();
1685        }
1686
1687        let Some(value_bytes) = iter.value() else {
1688            return fallback();
1689        };
1690        let chunk = BlockNumberList::decompress(value_bytes)?;
1691
1692        let (rank, found_block) = compute_history_rank(&chunk, block_number);
1693        // Ignore later Rocks history that is ahead of the companion MDBX snapshot.
1694        let found_block = found_block.filter(|block| *block <= visible_tip);
1695
1696        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1697            iter.prev();
1698            iter.status().map_err(|e| {
1699                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1700                    message: e.to_string().into(),
1701                    code: -1,
1702                }))
1703            })?;
1704            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1705
1706            // If the current shard only contains history above `visible_tip`, there is no usable
1707            // later change. Without a previous shard for the same key, fall back to the existing
1708            // not-written / maybe-pruned result instead of routing into plain state.
1709            if found_block.is_none() && !has_prev {
1710                return fallback()
1711            }
1712
1713            !has_prev
1714        } else {
1715            false
1716        };
1717
1718        Ok(HistoryInfo::from_lookup(
1719            found_block,
1720            is_before_first_write,
1721            lowest_available_block_number,
1722        ))
1723    }
1724}
1725
1726/// Outcome of pruning a history shard in `RocksDB`.
1727#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1728pub enum PruneShardOutcome {
1729    /// Shard was deleted entirely.
1730    Deleted,
1731    /// Shard was updated with filtered block numbers.
1732    Updated,
1733    /// Shard was unchanged (no blocks <= `to_block`).
1734    Unchanged,
1735}
1736
1737/// Tracks pruning outcomes for batch operations.
1738#[derive(Debug, Default, Clone, Copy)]
1739pub struct PrunedIndices {
1740    /// Number of shards completely deleted.
1741    pub deleted: usize,
1742    /// Number of shards that were updated (filtered but still have entries).
1743    pub updated: usize,
1744    /// Number of shards that were unchanged.
1745    pub unchanged: usize,
1746}
1747
1748/// Handle for building a batch of operations atomically.
1749///
1750/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1751/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1752/// where you don't need to read back uncommitted data within the same operation
1753/// (e.g., history index writes).
1754///
1755/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1756/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1757#[must_use = "batch must be committed"]
1758pub struct RocksDBBatch<'a> {
1759    provider: &'a RocksDBProvider,
1760    inner: WriteBatchWithTransaction<true>,
1761    buf: Vec<u8>,
1762    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1763    auto_commit_threshold: Option<usize>,
1764}
1765
1766impl fmt::Debug for RocksDBBatch<'_> {
1767    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1768        f.debug_struct("RocksDBBatch")
1769            .field("provider", &self.provider)
1770            .field("batch", &"<WriteBatchWithTransaction>")
1771            // Number of operations in this batch
1772            .field("length", &self.inner.len())
1773            // Total serialized size (encoded key + compressed value + metadata) of this batch
1774            // in bytes
1775            .field("size_in_bytes", &self.inner.size_in_bytes())
1776            .finish()
1777    }
1778}
1779
1780impl<'a> RocksDBBatch<'a> {
1781    /// Puts a value into the batch.
1782    ///
1783    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1784    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1785        let encoded_key = key.encode();
1786        self.put_encoded::<T>(&encoded_key, value)
1787    }
1788
1789    /// Puts a value into the batch using pre-encoded key.
1790    ///
1791    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1792    pub fn put_encoded<T: Table>(
1793        &mut self,
1794        key: &<T::Key as Encode>::Encoded,
1795        value: &T::Value,
1796    ) -> ProviderResult<()> {
1797        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1798        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1799        self.maybe_auto_commit()?;
1800        Ok(())
1801    }
1802
1803    /// Deletes a value from the batch.
1804    ///
1805    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1806    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1807        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1808        self.maybe_auto_commit()?;
1809        Ok(())
1810    }
1811
1812    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1813    ///
1814    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1815    /// Returns immediately if auto-commit is disabled or threshold not reached.
1816    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1817        if let Some(threshold) = self.auto_commit_threshold &&
1818            self.inner.size_in_bytes() >= threshold
1819        {
1820            tracing::debug!(
1821                target: "providers::rocksdb",
1822                batch_size = self.inner.size_in_bytes(),
1823                threshold,
1824                "Auto-committing RocksDB batch"
1825            );
1826            let old_batch = std::mem::take(&mut self.inner);
1827            self.provider.0.db_rw().write_opt(old_batch, &synced_write_options()).map_err(|e| {
1828                ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1829                    message: e.to_string().into(),
1830                    code: -1,
1831                }))
1832            })?;
1833        }
1834        Ok(())
1835    }
1836
1837    /// Commits the batch to the database.
1838    ///
1839    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1840    ///
1841    /// # Panics
1842    /// Panics if the provider is in read-only mode.
1843    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1844    pub fn commit(self) -> ProviderResult<()> {
1845        self.provider.0.db_rw().write_opt(self.inner, &synced_write_options()).map_err(|e| {
1846            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1847                message: e.to_string().into(),
1848                code: -1,
1849            }))
1850        })
1851    }
1852
1853    /// Returns the number of write operations (puts + deletes) queued in this batch.
1854    pub fn len(&self) -> usize {
1855        self.inner.len()
1856    }
1857
1858    /// Returns `true` if the batch contains no operations.
1859    pub fn is_empty(&self) -> bool {
1860        self.inner.is_empty()
1861    }
1862
1863    /// Returns the size of the batch in bytes.
1864    pub fn size_in_bytes(&self) -> usize {
1865        self.inner.size_in_bytes()
1866    }
1867
1868    /// Returns a reference to the underlying `RocksDB` provider.
1869    pub const fn provider(&self) -> &RocksDBProvider {
1870        self.provider
1871    }
1872
1873    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1874    ///
1875    /// This is used to defer commits to the provider level.
1876    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1877        self.inner
1878    }
1879
1880    /// Gets a value from the database.
1881    ///
1882    /// **Important constraint:** This reads only committed state, not pending writes in this
1883    /// batch or other pending batches in `pending_rocksdb_batches`.
1884    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1885        self.provider.get::<T>(key)
1886    }
1887
1888    /// Appends indices to an account history shard with proper shard management.
1889    ///
1890    /// Loads the existing shard (if any), appends new indices, and rechunks into
1891    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1892    ///
1893    /// # Requirements
1894    ///
1895    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1896    /// - This method MUST only be called once per address per batch. The batch reads existing
1897    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1898    ///   address will cause the second call to overwrite the first.
1899    pub fn append_account_history_shard(
1900        &mut self,
1901        address: Address,
1902        indices: impl IntoIterator<Item = u64>,
1903    ) -> ProviderResult<()> {
1904        let indices: Vec<u64> = indices.into_iter().collect();
1905
1906        if indices.is_empty() {
1907            return Ok(());
1908        }
1909
1910        debug_assert!(
1911            indices.windows(2).all(|w| w[0] < w[1]),
1912            "indices must be strictly increasing: {:?}",
1913            indices
1914        );
1915
1916        let last_key = ShardedKey::new(address, u64::MAX);
1917        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1918        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1919
1920        last_shard.append(indices).map_err(ProviderError::other)?;
1921
1922        // Fast path: all indices fit in one shard
1923        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1924            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1925            return Ok(());
1926        }
1927
1928        // Slow path: rechunk into multiple shards
1929        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1930        let mut chunks_peekable = chunks.into_iter().peekable();
1931
1932        while let Some(chunk) = chunks_peekable.next() {
1933            let shard = BlockNumberList::new_pre_sorted(chunk);
1934            let highest_block_number = if chunks_peekable.peek().is_some() {
1935                shard.iter().next_back().expect("`chunks` does not return empty list")
1936            } else {
1937                u64::MAX
1938            };
1939
1940            self.put::<tables::AccountsHistory>(
1941                ShardedKey::new(address, highest_block_number),
1942                &shard,
1943            )?;
1944        }
1945
1946        Ok(())
1947    }
1948
1949    /// Appends indices to a storage history shard with proper shard management.
1950    ///
1951    /// Loads the existing shard (if any), appends new indices, and rechunks into
1952    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1953    ///
1954    /// # Requirements
1955    ///
1956    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1957    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1958    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1959    ///   twice for the same key will cause the second call to overwrite the first.
1960    pub fn append_storage_history_shard(
1961        &mut self,
1962        address: Address,
1963        storage_key: B256,
1964        indices: impl IntoIterator<Item = u64>,
1965    ) -> ProviderResult<()> {
1966        let indices: Vec<u64> = indices.into_iter().collect();
1967
1968        for (key, shard) in
1969            self.provider.storage_history_shards_to_put(address, storage_key, indices)?
1970        {
1971            self.put::<tables::StoragesHistory>(key, &shard)?;
1972        }
1973
1974        Ok(())
1975    }
1976
1977    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1978    ///
1979    /// Mirrors MDBX `unwind_history_shards` behavior:
1980    /// - Deletes shards entirely above `keep_to`
1981    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1982    /// - Preserves shards entirely below `keep_to`
1983    pub fn unwind_account_history_to(
1984        &mut self,
1985        address: Address,
1986        keep_to: BlockNumber,
1987    ) -> ProviderResult<()> {
1988        let shards = self.provider.account_history_shards(address)?;
1989        if shards.is_empty() {
1990            return Ok(());
1991        }
1992
1993        // Find the first shard that might contain blocks > keep_to.
1994        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1995        let boundary_idx = shards.iter().position(|(key, _)| {
1996            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1997        });
1998
1999        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2000        let Some(boundary_idx) = boundary_idx else {
2001            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2002            if last_key.highest_block_number != u64::MAX {
2003                self.delete::<tables::AccountsHistory>(last_key.clone())?;
2004                self.put::<tables::AccountsHistory>(
2005                    ShardedKey::new(address, u64::MAX),
2006                    last_value,
2007                )?;
2008            }
2009            return Ok(());
2010        };
2011
2012        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2013        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2014            self.delete::<tables::AccountsHistory>(key.clone())?;
2015        }
2016
2017        // Process the boundary shard: filter out blocks > keep_to
2018        let (boundary_key, boundary_list) = &shards[boundary_idx];
2019
2020        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2021        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
2022
2023        // Build truncated list once; check emptiness directly (avoids double iteration)
2024        let new_last =
2025            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2026
2027        if new_last.is_empty() {
2028            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2029            // u64::MAX.
2030            if boundary_idx == 0 {
2031                // Nothing left for this address
2032                return Ok(());
2033            }
2034
2035            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2036            if prev_key.highest_block_number != u64::MAX {
2037                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
2038                self.put::<tables::AccountsHistory>(
2039                    ShardedKey::new(address, u64::MAX),
2040                    prev_value,
2041                )?;
2042            }
2043            return Ok(());
2044        }
2045
2046        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
2047
2048        Ok(())
2049    }
2050
2051    /// Prunes history shards, removing blocks <= `to_block`.
2052    ///
2053    /// Generic implementation for both account and storage history pruning.
2054    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2055    /// (if any) will have the sentinel key (`u64::MAX`).
2056    #[expect(clippy::too_many_arguments)]
2057    fn prune_history_shards_inner<K>(
2058        &mut self,
2059        shards: Vec<(K, BlockNumberList)>,
2060        to_block: BlockNumber,
2061        get_highest: impl Fn(&K) -> u64,
2062        is_sentinel: impl Fn(&K) -> bool,
2063        delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
2064        put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
2065        create_sentinel: impl Fn() -> K,
2066    ) -> ProviderResult<PruneShardOutcome>
2067    where
2068        K: Clone,
2069    {
2070        if shards.is_empty() {
2071            return Ok(PruneShardOutcome::Unchanged);
2072        }
2073
2074        let mut deleted = false;
2075        let mut updated = false;
2076        let mut last_remaining: Option<(K, BlockNumberList)> = None;
2077
2078        for (key, block_list) in shards {
2079            if !is_sentinel(&key) && get_highest(&key) <= to_block {
2080                delete_shard(self, key)?;
2081                deleted = true;
2082            } else {
2083                let original_len = block_list.len();
2084                let filtered =
2085                    BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
2086
2087                if filtered.is_empty() {
2088                    delete_shard(self, key)?;
2089                    deleted = true;
2090                } else if filtered.len() < original_len {
2091                    put_shard(self, key.clone(), &filtered)?;
2092                    last_remaining = Some((key, filtered));
2093                    updated = true;
2094                } else {
2095                    last_remaining = Some((key, block_list));
2096                }
2097            }
2098        }
2099
2100        if let Some((last_key, last_value)) = last_remaining &&
2101            !is_sentinel(&last_key)
2102        {
2103            delete_shard(self, last_key)?;
2104            put_shard(self, create_sentinel(), &last_value)?;
2105            updated = true;
2106        }
2107
2108        if deleted {
2109            Ok(PruneShardOutcome::Deleted)
2110        } else if updated {
2111            Ok(PruneShardOutcome::Updated)
2112        } else {
2113            Ok(PruneShardOutcome::Unchanged)
2114        }
2115    }
2116
2117    /// Prunes account history for the given address, removing blocks <= `to_block`.
2118    ///
2119    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2120    /// (if any) will have the sentinel key (`u64::MAX`).
2121    pub fn prune_account_history_to(
2122        &mut self,
2123        address: Address,
2124        to_block: BlockNumber,
2125    ) -> ProviderResult<PruneShardOutcome> {
2126        let shards = self.provider.account_history_shards(address)?;
2127        self.prune_history_shards_inner(
2128            shards,
2129            to_block,
2130            |key| key.highest_block_number,
2131            |key| key.highest_block_number == u64::MAX,
2132            |batch, key| batch.delete::<tables::AccountsHistory>(key),
2133            |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2134            || ShardedKey::new(address, u64::MAX),
2135        )
2136    }
2137
2138    /// Prunes account history for multiple addresses in a single iterator pass.
2139    ///
2140    /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
2141    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2142    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2143    ///
2144    /// `targets` MUST be sorted by address for correctness and optimal performance
2145    /// (matches on-disk key order).
2146    pub fn prune_account_history_batch(
2147        &mut self,
2148        targets: &[(Address, BlockNumber)],
2149    ) -> ProviderResult<PrunedIndices> {
2150        if targets.is_empty() {
2151            return Ok(PrunedIndices::default());
2152        }
2153
2154        debug_assert!(
2155            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2156            "prune_account_history_batch: targets must be sorted by address"
2157        );
2158
2159        // ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
2160        // The first 20 bytes are the "prefix" that identifies the address
2161        const PREFIX_LEN: usize = 20;
2162
2163        let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2164        let mut iter = self.provider.0.raw_iterator_cf(cf);
2165        let mut outcomes = PrunedIndices::default();
2166
2167        for (address, to_block) in targets {
2168            // Build the target prefix (first 20 bytes = address)
2169            let start_key = ShardedKey::new(*address, 0u64).encode();
2170            let target_prefix = &start_key[..PREFIX_LEN];
2171
2172            // Check if we need to seek or if the iterator is already positioned correctly.
2173            // After processing the previous target, the iterator is either:
2174            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2175            // 2. Invalid (no more keys)
2176            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2177            let needs_seek = if iter.valid() {
2178                if let Some(current_key) = iter.key() {
2179                    // If current key's prefix < target prefix, we need to seek forward
2180                    // If current key's prefix > target prefix, this target has no shards (skip)
2181                    // If current key's prefix == target prefix, we're already positioned
2182                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2183                } else {
2184                    true
2185                }
2186            } else {
2187                true
2188            };
2189
2190            if needs_seek {
2191                iter.seek(start_key);
2192                iter.status().map_err(|e| {
2193                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2194                        message: e.to_string().into(),
2195                        code: -1,
2196                    }))
2197                })?;
2198            }
2199
2200            // Collect all shards for this address using raw prefix comparison
2201            let mut shards = Vec::new();
2202            while iter.valid() {
2203                let Some(key_bytes) = iter.key() else { break };
2204
2205                // Use raw prefix comparison instead of full decode for the prefix check
2206                let current_prefix = key_bytes.get(..PREFIX_LEN);
2207                if current_prefix != Some(target_prefix) {
2208                    break;
2209                }
2210
2211                // Now decode the full key (we need the block number)
2212                let key = ShardedKey::<Address>::decode(key_bytes)
2213                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2214
2215                let Some(value_bytes) = iter.value() else { break };
2216                let value = BlockNumberList::decompress(value_bytes)
2217                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2218
2219                shards.push((key, value));
2220                iter.next();
2221            }
2222
2223            match self.prune_history_shards_inner(
2224                shards,
2225                *to_block,
2226                |key| key.highest_block_number,
2227                |key| key.highest_block_number == u64::MAX,
2228                |batch, key| batch.delete::<tables::AccountsHistory>(key),
2229                |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2230                || ShardedKey::new(*address, u64::MAX),
2231            )? {
2232                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2233                PruneShardOutcome::Updated => outcomes.updated += 1,
2234                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2235            }
2236        }
2237
2238        Ok(outcomes)
2239    }
2240
2241    /// Prunes storage history for the given address and storage key, removing blocks <=
2242    /// `to_block`.
2243    ///
2244    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2245    /// (if any) will have the sentinel key (`u64::MAX`).
2246    pub fn prune_storage_history_to(
2247        &mut self,
2248        address: Address,
2249        storage_key: B256,
2250        to_block: BlockNumber,
2251    ) -> ProviderResult<PruneShardOutcome> {
2252        let shards = self.provider.storage_history_shards(address, storage_key)?;
2253        self.prune_history_shards_inner(
2254            shards,
2255            to_block,
2256            |key| key.sharded_key.highest_block_number,
2257            |key| key.sharded_key.highest_block_number == u64::MAX,
2258            |batch, key| batch.delete::<tables::StoragesHistory>(key),
2259            |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2260            || StorageShardedKey::last(address, storage_key),
2261        )
2262    }
2263
2264    /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
2265    /// pass.
2266    ///
2267    /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
2268    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2269    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2270    ///
2271    /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
2272    /// performance (matches on-disk key order).
2273    pub fn prune_storage_history_batch(
2274        &mut self,
2275        targets: &[((Address, B256), BlockNumber)],
2276    ) -> ProviderResult<PrunedIndices> {
2277        if targets.is_empty() {
2278            return Ok(PrunedIndices::default());
2279        }
2280
2281        debug_assert!(
2282            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2283            "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2284        );
2285
2286        // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
2287        // The first 52 bytes are the "prefix" that identifies (address, storage_key)
2288        const PREFIX_LEN: usize = 52;
2289
2290        let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2291        let mut iter = self.provider.0.raw_iterator_cf(cf);
2292        let mut outcomes = PrunedIndices::default();
2293
2294        for ((address, storage_key), to_block) in targets {
2295            // Build the target prefix (first 52 bytes of encoded key)
2296            let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2297            let target_prefix = &start_key[..PREFIX_LEN];
2298
2299            // Check if we need to seek or if the iterator is already positioned correctly.
2300            // After processing the previous target, the iterator is either:
2301            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2302            // 2. Invalid (no more keys)
2303            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2304            let needs_seek = if iter.valid() {
2305                if let Some(current_key) = iter.key() {
2306                    // If current key's prefix < target prefix, we need to seek forward
2307                    // If current key's prefix > target prefix, this target has no shards (skip)
2308                    // If current key's prefix == target prefix, we're already positioned
2309                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2310                } else {
2311                    true
2312                }
2313            } else {
2314                true
2315            };
2316
2317            if needs_seek {
2318                iter.seek(start_key);
2319                iter.status().map_err(|e| {
2320                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2321                        message: e.to_string().into(),
2322                        code: -1,
2323                    }))
2324                })?;
2325            }
2326
2327            // Collect all shards for this (address, storage_key) pair using prefix comparison
2328            let mut shards = Vec::new();
2329            while iter.valid() {
2330                let Some(key_bytes) = iter.key() else { break };
2331
2332                // Use raw prefix comparison instead of full decode for the prefix check
2333                let current_prefix = key_bytes.get(..PREFIX_LEN);
2334                if current_prefix != Some(target_prefix) {
2335                    break;
2336                }
2337
2338                // Now decode the full key (we need the block number)
2339                let key = StorageShardedKey::decode(key_bytes)
2340                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2341
2342                let Some(value_bytes) = iter.value() else { break };
2343                let value = BlockNumberList::decompress(value_bytes)
2344                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2345
2346                shards.push((key, value));
2347                iter.next();
2348            }
2349
2350            // Use existing prune_history_shards_inner logic
2351            match self.prune_history_shards_inner(
2352                shards,
2353                *to_block,
2354                |key| key.sharded_key.highest_block_number,
2355                |key| key.sharded_key.highest_block_number == u64::MAX,
2356                |batch, key| batch.delete::<tables::StoragesHistory>(key),
2357                |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2358                || StorageShardedKey::last(*address, *storage_key),
2359            )? {
2360                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2361                PruneShardOutcome::Updated => outcomes.updated += 1,
2362                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2363            }
2364        }
2365
2366        Ok(outcomes)
2367    }
2368
2369    /// Unwinds storage history to keep only blocks `<= keep_to`.
2370    ///
2371    /// Handles multi-shard scenarios by:
2372    /// 1. Loading all shards for the `(address, storage_key)` pair
2373    /// 2. Finding the boundary shard containing `keep_to`
2374    /// 3. Deleting all shards after the boundary
2375    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
2376    /// 5. Ensuring the last shard is keyed with `u64::MAX`
2377    pub fn unwind_storage_history_to(
2378        &mut self,
2379        address: Address,
2380        storage_key: B256,
2381        keep_to: BlockNumber,
2382    ) -> ProviderResult<()> {
2383        let shards = self.provider.storage_history_shards(address, storage_key)?;
2384        if shards.is_empty() {
2385            return Ok(());
2386        }
2387
2388        // Find the first shard that might contain blocks > keep_to.
2389        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
2390        let boundary_idx = shards.iter().position(|(key, _)| {
2391            key.sharded_key.highest_block_number == u64::MAX ||
2392                key.sharded_key.highest_block_number > keep_to
2393        });
2394
2395        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2396        let Some(boundary_idx) = boundary_idx else {
2397            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2398            if last_key.sharded_key.highest_block_number != u64::MAX {
2399                self.delete::<tables::StoragesHistory>(last_key.clone())?;
2400                self.put::<tables::StoragesHistory>(
2401                    StorageShardedKey::last(address, storage_key),
2402                    last_value,
2403                )?;
2404            }
2405            return Ok(());
2406        };
2407
2408        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2409        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2410            self.delete::<tables::StoragesHistory>(key.clone())?;
2411        }
2412
2413        // Process the boundary shard: filter out blocks > keep_to
2414        let (boundary_key, boundary_list) = &shards[boundary_idx];
2415
2416        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2417        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2418
2419        // Build truncated list once; check emptiness directly (avoids double iteration)
2420        let new_last =
2421            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2422
2423        if new_last.is_empty() {
2424            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2425            // u64::MAX.
2426            if boundary_idx == 0 {
2427                // Nothing left for this (address, storage_key) pair
2428                return Ok(());
2429            }
2430
2431            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2432            if prev_key.sharded_key.highest_block_number != u64::MAX {
2433                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2434                self.put::<tables::StoragesHistory>(
2435                    StorageShardedKey::last(address, storage_key),
2436                    prev_value,
2437                )?;
2438            }
2439            return Ok(());
2440        }
2441
2442        self.put::<tables::StoragesHistory>(
2443            StorageShardedKey::last(address, storage_key),
2444            &new_last,
2445        )?;
2446
2447        Ok(())
2448    }
2449
2450    /// Clears all account history shards for the given address.
2451    ///
2452    /// Used when unwinding from block 0 (i.e., removing all history).
2453    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2454        let shards = self.provider.account_history_shards(address)?;
2455        for (key, _) in shards {
2456            self.delete::<tables::AccountsHistory>(key)?;
2457        }
2458        Ok(())
2459    }
2460
2461    /// Clears all storage history shards for the given `(address, storage_key)` pair.
2462    ///
2463    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
2464    pub fn clear_storage_history(
2465        &mut self,
2466        address: Address,
2467        storage_key: B256,
2468    ) -> ProviderResult<()> {
2469        let shards = self.provider.storage_history_shards(address, storage_key)?;
2470        for (key, _) in shards {
2471            self.delete::<tables::StoragesHistory>(key)?;
2472        }
2473        Ok(())
2474    }
2475}
2476
2477/// `RocksDB` transaction wrapper providing MDBX-like semantics.
2478///
2479/// Supports:
2480/// - Read-your-writes: reads see uncommitted writes within the same transaction
2481/// - Atomic commit/rollback
2482/// - Iteration over uncommitted data
2483///
2484/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
2485/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
2486pub struct RocksTx<'db> {
2487    inner: Transaction<'db, OptimisticTransactionDB>,
2488    provider: &'db RocksDBProvider,
2489}
2490
2491impl fmt::Debug for RocksTx<'_> {
2492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2493        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2494    }
2495}
2496
2497impl<'db> RocksTx<'db> {
2498    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
2499    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2500        let encoded_key = key.encode();
2501        self.get_encoded::<T>(&encoded_key)
2502    }
2503
2504    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
2505    pub fn get_encoded<T: Table>(
2506        &self,
2507        key: &<T::Key as Encode>::Encoded,
2508    ) -> ProviderResult<Option<T::Value>> {
2509        let cf = self.provider.get_cf_handle::<T>()?;
2510        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2511            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2512                message: e.to_string().into(),
2513                code: -1,
2514            }))
2515        })?;
2516
2517        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2518    }
2519
2520    /// Puts a value into the specified table.
2521    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2522        let encoded_key = key.encode();
2523        self.put_encoded::<T>(&encoded_key, value)
2524    }
2525
2526    /// Puts a value using pre-encoded key.
2527    pub fn put_encoded<T: Table>(
2528        &self,
2529        key: &<T::Key as Encode>::Encoded,
2530        value: &T::Value,
2531    ) -> ProviderResult<()> {
2532        let cf = self.provider.get_cf_handle::<T>()?;
2533        let mut buf = Vec::new();
2534        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2535
2536        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2537            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2538                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2539                operation: DatabaseWriteOperation::PutUpsert,
2540                table_name: T::NAME,
2541                key: key.as_ref().to_vec(),
2542            })))
2543        })
2544    }
2545
2546    /// Deletes a value from the specified table.
2547    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2548        let cf = self.provider.get_cf_handle::<T>()?;
2549        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2550            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2551                message: e.to_string().into(),
2552                code: -1,
2553            }))
2554        })
2555    }
2556
2557    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
2558    ///
2559    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
2560    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2561        let cf = self.provider.get_cf_handle::<T>()?;
2562        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2563        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2564    }
2565
2566    /// Creates an iterator starting from the given key (inclusive).
2567    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2568        let cf = self.provider.get_cf_handle::<T>()?;
2569        let encoded_key = key.encode();
2570        let iter = self
2571            .inner
2572            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2573        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2574    }
2575
2576    /// Commits the transaction, persisting all changes.
2577    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2578    pub fn commit(self) -> ProviderResult<()> {
2579        self.inner.commit().map_err(|e| {
2580            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2581                message: e.to_string().into(),
2582                code: -1,
2583            }))
2584        })
2585    }
2586
2587    /// Rolls back the transaction, discarding all changes.
2588    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2589    pub fn rollback(self) -> ProviderResult<()> {
2590        self.inner.rollback().map_err(|e| {
2591            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2592        })
2593    }
2594}
2595
2596/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
2597enum RocksDBIterEnum<'db> {
2598    /// Iterator from read-write `OptimisticTransactionDB`.
2599    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2600    /// Iterator from read-only `DB`.
2601    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2602}
2603
2604impl Iterator for RocksDBIterEnum<'_> {
2605    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2606
2607    fn next(&mut self) -> Option<Self::Item> {
2608        match self {
2609            Self::ReadWrite(iter) => iter.next(),
2610            Self::ReadOnly(iter) => iter.next(),
2611        }
2612    }
2613}
2614
2615/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
2616///
2617/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
2618/// without reinitializing the iterator.
2619enum RocksDBRawIterEnum<'db> {
2620    /// Raw iterator from read-write `OptimisticTransactionDB`.
2621    ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2622    /// Raw iterator from read-only `DB`.
2623    ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2624}
2625
2626impl RocksDBRawIterEnum<'_> {
2627    /// Positions the iterator at the first key >= `key`.
2628    fn seek(&mut self, key: impl AsRef<[u8]>) {
2629        match self {
2630            Self::ReadWrite(iter) => iter.seek(key),
2631            Self::ReadOnly(iter) => iter.seek(key),
2632        }
2633    }
2634
2635    /// Returns true if the iterator is positioned at a valid key-value pair.
2636    fn valid(&self) -> bool {
2637        match self {
2638            Self::ReadWrite(iter) => iter.valid(),
2639            Self::ReadOnly(iter) => iter.valid(),
2640        }
2641    }
2642
2643    /// Returns the current key, if valid.
2644    fn key(&self) -> Option<&[u8]> {
2645        match self {
2646            Self::ReadWrite(iter) => iter.key(),
2647            Self::ReadOnly(iter) => iter.key(),
2648        }
2649    }
2650
2651    /// Returns the current value, if valid.
2652    fn value(&self) -> Option<&[u8]> {
2653        match self {
2654            Self::ReadWrite(iter) => iter.value(),
2655            Self::ReadOnly(iter) => iter.value(),
2656        }
2657    }
2658
2659    /// Advances the iterator to the next key.
2660    fn next(&mut self) {
2661        match self {
2662            Self::ReadWrite(iter) => iter.next(),
2663            Self::ReadOnly(iter) => iter.next(),
2664        }
2665    }
2666
2667    /// Moves the iterator to the previous key.
2668    fn prev(&mut self) {
2669        match self {
2670            Self::ReadWrite(iter) => iter.prev(),
2671            Self::ReadOnly(iter) => iter.prev(),
2672        }
2673    }
2674
2675    /// Returns the status of the iterator.
2676    fn status(&self) -> Result<(), rocksdb::Error> {
2677        match self {
2678            Self::ReadWrite(iter) => iter.status(),
2679            Self::ReadOnly(iter) => iter.status(),
2680        }
2681    }
2682}
2683
2684/// Iterator over a `RocksDB` table (non-transactional).
2685///
2686/// Yields decoded `(Key, Value)` pairs in key order.
2687pub struct RocksDBIter<'db, T: Table> {
2688    inner: RocksDBIterEnum<'db>,
2689    _marker: std::marker::PhantomData<T>,
2690}
2691
2692impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2693    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2694        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2695    }
2696}
2697
2698impl<T: Table> Iterator for RocksDBIter<'_, T> {
2699    type Item = ProviderResult<(T::Key, T::Value)>;
2700
2701    fn next(&mut self) -> Option<Self::Item> {
2702        Some(decode_iter_item::<T>(self.inner.next()?))
2703    }
2704}
2705
2706/// Raw iterator over a `RocksDB` table (non-transactional).
2707///
2708/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2709pub struct RocksDBRawIter<'db> {
2710    inner: RocksDBIterEnum<'db>,
2711}
2712
2713impl fmt::Debug for RocksDBRawIter<'_> {
2714    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2715        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2716    }
2717}
2718
2719impl Iterator for RocksDBRawIter<'_> {
2720    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2721
2722    fn next(&mut self) -> Option<Self::Item> {
2723        match self.inner.next()? {
2724            Ok(kv) => Some(Ok(kv)),
2725            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2726                message: e.to_string().into(),
2727                code: -1,
2728            })))),
2729        }
2730    }
2731}
2732
2733/// Iterator over a `RocksDB` table within a transaction.
2734///
2735/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2736pub struct RocksTxIter<'tx, T: Table> {
2737    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2738    _marker: std::marker::PhantomData<T>,
2739}
2740
2741impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2742    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2743        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2744    }
2745}
2746
2747impl<T: Table> Iterator for RocksTxIter<'_, T> {
2748    type Item = ProviderResult<(T::Key, T::Value)>;
2749
2750    fn next(&mut self) -> Option<Self::Item> {
2751        Some(decode_iter_item::<T>(self.inner.next()?))
2752    }
2753}
2754
2755/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
2756///
2757/// Handles both error propagation from the underlying iterator and
2758/// decoding/decompression of the key and value bytes.
2759fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2760    let (key_bytes, value_bytes) = result.map_err(|e| {
2761        ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2762            message: e.to_string().into(),
2763            code: -1,
2764        }))
2765    })?;
2766
2767    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2768        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2769
2770    let value = T::Value::decompress(&value_bytes)
2771        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2772
2773    Ok((key, value))
2774}
2775
2776/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2777const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2778    match level {
2779        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2780        LogLevel::Error => rocksdb::LogLevel::Error,
2781        LogLevel::Warn => rocksdb::LogLevel::Warn,
2782        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2783        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2784    }
2785}
2786
2787#[cfg(test)]
2788mod tests {
2789    use super::*;
2790    use crate::providers::HistoryInfo;
2791    use alloy_primitives::{Address, TxHash, B256};
2792    use reth_db_api::{
2793        models::{
2794            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2795            storage_sharded_key::StorageShardedKey,
2796            IntegerList,
2797        },
2798        table::Table,
2799        tables,
2800    };
2801    use tempfile::TempDir;
2802
2803    #[test]
2804    fn test_with_default_tables_registers_required_column_families() {
2805        let temp_dir = TempDir::new().unwrap();
2806
2807        // Build with default tables
2808        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2809
2810        // Should be able to write/read TransactionHashNumbers
2811        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2812        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2813        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2814
2815        // Should be able to write/read AccountsHistory
2816        let key = ShardedKey::new(Address::ZERO, 100);
2817        let value = IntegerList::default();
2818        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2819        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2820
2821        // Should be able to write/read StoragesHistory
2822        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2823        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2824        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2825    }
2826
2827    #[derive(Debug)]
2828    struct TestTable;
2829
2830    impl Table for TestTable {
2831        const NAME: &'static str = "TestTable";
2832        const DUPSORT: bool = false;
2833        type Key = u64;
2834        type Value = Vec<u8>;
2835    }
2836
2837    #[test]
2838    fn test_basic_operations() {
2839        let temp_dir = TempDir::new().unwrap();
2840
2841        let provider = RocksDBBuilder::new(temp_dir.path())
2842            .with_table::<TestTable>() // Type-safe!
2843            .build()
2844            .unwrap();
2845
2846        let key = 42u64;
2847        let value = b"test_value".to_vec();
2848
2849        // Test write
2850        provider.put::<TestTable>(key, &value).unwrap();
2851
2852        // Test read
2853        let result = provider.get::<TestTable>(key).unwrap();
2854        assert_eq!(result, Some(value));
2855
2856        // Test delete
2857        provider.delete::<TestTable>(key).unwrap();
2858
2859        // Verify deletion
2860        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2861    }
2862
2863    #[test]
2864    fn test_batch_operations() {
2865        let temp_dir = TempDir::new().unwrap();
2866        let provider =
2867            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2868
2869        // Write multiple entries in a batch
2870        provider
2871            .write_batch(|batch| {
2872                for i in 0..10u64 {
2873                    let value = format!("value_{i}").into_bytes();
2874                    batch.put::<TestTable>(i, &value)?;
2875                }
2876                Ok(())
2877            })
2878            .unwrap();
2879
2880        // Read all entries
2881        for i in 0..10u64 {
2882            let value = format!("value_{i}").into_bytes();
2883            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2884        }
2885
2886        // Delete all entries in a batch
2887        provider
2888            .write_batch(|batch| {
2889                for i in 0..10u64 {
2890                    batch.delete::<TestTable>(i)?;
2891                }
2892                Ok(())
2893            })
2894            .unwrap();
2895
2896        // Verify all deleted
2897        for i in 0..10u64 {
2898            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2899        }
2900    }
2901
2902    #[test]
2903    fn test_with_real_table() {
2904        let temp_dir = TempDir::new().unwrap();
2905        let provider = RocksDBBuilder::new(temp_dir.path())
2906            .with_table::<tables::TransactionHashNumbers>()
2907            .with_metrics()
2908            .build()
2909            .unwrap();
2910
2911        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2912
2913        // Insert and retrieve
2914        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2915        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2916
2917        // Batch insert multiple transactions
2918        provider
2919            .write_batch(|batch| {
2920                for i in 0..10u64 {
2921                    let hash = TxHash::from(B256::from([i as u8; 32]));
2922                    let value = i * 100;
2923                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2924                }
2925                Ok(())
2926            })
2927            .unwrap();
2928
2929        // Verify batch insertions
2930        for i in 0..10u64 {
2931            let hash = TxHash::from(B256::from([i as u8; 32]));
2932            assert_eq!(
2933                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2934                Some(i * 100)
2935            );
2936        }
2937    }
2938    #[test]
2939    fn test_statistics_enabled() {
2940        let temp_dir = TempDir::new().unwrap();
2941        // Just verify that building with statistics doesn't panic
2942        let provider = RocksDBBuilder::new(temp_dir.path())
2943            .with_table::<TestTable>()
2944            .with_statistics()
2945            .build()
2946            .unwrap();
2947
2948        // Do operations - data should be immediately readable with OptimisticTransactionDB
2949        for i in 0..10 {
2950            let value = vec![i as u8];
2951            provider.put::<TestTable>(i, &value).unwrap();
2952            // Verify write is visible
2953            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2954        }
2955    }
2956
2957    #[test]
2958    fn test_data_persistence() {
2959        let temp_dir = TempDir::new().unwrap();
2960        let provider =
2961            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2962
2963        // Insert data - OptimisticTransactionDB writes are immediately visible
2964        let value = vec![42u8; 1000];
2965        for i in 0..100 {
2966            provider.put::<TestTable>(i, &value).unwrap();
2967        }
2968
2969        // Verify data is readable
2970        for i in 0..100 {
2971            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2972        }
2973    }
2974
2975    #[test]
2976    fn test_transaction_read_your_writes() {
2977        let temp_dir = TempDir::new().unwrap();
2978        let provider =
2979            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2980
2981        // Create a transaction
2982        let tx = provider.tx();
2983
2984        // Write data within the transaction
2985        let key = 42u64;
2986        let value = b"test_value".to_vec();
2987        tx.put::<TestTable>(key, &value).unwrap();
2988
2989        // Read-your-writes: should see uncommitted data in same transaction
2990        let result = tx.get::<TestTable>(key).unwrap();
2991        assert_eq!(
2992            result,
2993            Some(value.clone()),
2994            "Transaction should see its own uncommitted writes"
2995        );
2996
2997        // Data should NOT be visible via provider (outside transaction)
2998        let provider_result = provider.get::<TestTable>(key).unwrap();
2999        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
3000
3001        // Commit the transaction
3002        tx.commit().unwrap();
3003
3004        // Now data should be visible via provider
3005        let committed_result = provider.get::<TestTable>(key).unwrap();
3006        assert_eq!(committed_result, Some(value), "Committed data should be visible");
3007    }
3008
3009    #[test]
3010    fn test_transaction_rollback() {
3011        let temp_dir = TempDir::new().unwrap();
3012        let provider =
3013            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3014
3015        // First, put some initial data
3016        let key = 100u64;
3017        let initial_value = b"initial".to_vec();
3018        provider.put::<TestTable>(key, &initial_value).unwrap();
3019
3020        // Create a transaction and modify data
3021        let tx = provider.tx();
3022        let new_value = b"modified".to_vec();
3023        tx.put::<TestTable>(key, &new_value).unwrap();
3024
3025        // Verify modification is visible within transaction
3026        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
3027
3028        // Rollback instead of commit
3029        tx.rollback().unwrap();
3030
3031        // Data should be unchanged (initial value)
3032        let result = provider.get::<TestTable>(key).unwrap();
3033        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
3034    }
3035
3036    #[test]
3037    fn test_transaction_iterator() {
3038        let temp_dir = TempDir::new().unwrap();
3039        let provider =
3040            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3041
3042        // Create a transaction
3043        let tx = provider.tx();
3044
3045        // Write multiple entries
3046        for i in 0..5u64 {
3047            let value = format!("value_{i}").into_bytes();
3048            tx.put::<TestTable>(i, &value).unwrap();
3049        }
3050
3051        // Iterate - should see uncommitted writes
3052        let mut count = 0;
3053        for result in tx.iter::<TestTable>().unwrap() {
3054            let (key, value) = result.unwrap();
3055            assert_eq!(value, format!("value_{key}").into_bytes());
3056            count += 1;
3057        }
3058        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
3059
3060        // Commit
3061        tx.commit().unwrap();
3062    }
3063
3064    #[test]
3065    fn test_batch_manual_commit() {
3066        let temp_dir = TempDir::new().unwrap();
3067        let provider =
3068            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3069
3070        // Create a batch via provider.batch()
3071        let mut batch = provider.batch();
3072
3073        // Add entries
3074        for i in 0..10u64 {
3075            let value = format!("batch_value_{i}").into_bytes();
3076            batch.put::<TestTable>(i, &value).unwrap();
3077        }
3078
3079        // Verify len/is_empty
3080        assert_eq!(batch.len(), 10);
3081        assert!(!batch.is_empty());
3082
3083        // Data should NOT be visible before commit
3084        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
3085
3086        // Commit the batch
3087        batch.commit().unwrap();
3088
3089        // Now data should be visible
3090        for i in 0..10u64 {
3091            let value = format!("batch_value_{i}").into_bytes();
3092            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3093        }
3094    }
3095
3096    #[test]
3097    fn test_first_and_last_entry() {
3098        let temp_dir = TempDir::new().unwrap();
3099        let provider =
3100            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3101
3102        // Empty table should return None for both
3103        assert_eq!(provider.first::<TestTable>().unwrap(), None);
3104        assert_eq!(provider.last::<TestTable>().unwrap(), None);
3105
3106        // Insert some entries
3107        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
3108        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
3109        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
3110
3111        // First should return the smallest key
3112        let first = provider.first::<TestTable>().unwrap();
3113        assert_eq!(first, Some((5, b"value_5".to_vec())));
3114
3115        // Last should return the largest key
3116        let last = provider.last::<TestTable>().unwrap();
3117        assert_eq!(last, Some((20, b"value_20".to_vec())));
3118    }
3119
3120    /// Tests the edge case where block < `lowest_available_block_number`.
3121    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
3122    /// so we keep this RocksDB-specific test to verify the low-level behavior.
3123    #[test]
3124    fn test_account_history_info_pruned_before_first_entry() {
3125        let temp_dir = TempDir::new().unwrap();
3126        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3127
3128        let address = Address::from([0x42; 20]);
3129
3130        // Create a single shard starting at block 100
3131        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3132        let shard_key = ShardedKey::new(address, u64::MAX);
3133        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3134
3135        // Query for block 50 with lowest_available_block_number = 100
3136        // This simulates a pruned state where data before block 100 is not available.
3137        // Since we're before the first write AND pruning boundary is set, we need to
3138        // check the changeset at the first write block.
3139        let result =
3140            provider.snapshot().account_history_info(address, 50, Some(100), u64::MAX).unwrap();
3141        assert_eq!(result, HistoryInfo::InChangeset(100));
3142    }
3143
3144    /// Verifies that a read-only (secondary) provider can catch up with primary writes.
3145    #[test]
3146    fn test_account_history_info_read_only_and_catch_up() {
3147        let temp_dir = TempDir::new().unwrap();
3148        let address = Address::from([0x42; 20]);
3149        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3150        let shard_key = ShardedKey::new(address, u64::MAX);
3151
3152        // Write data with a read-write provider
3153        let rw_provider =
3154            RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3155        rw_provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3156
3157        // Open read-only provider — it sees the initial data.
3158        let ro_provider = RocksDBBuilder::new(temp_dir.path())
3159            .with_default_tables()
3160            .with_read_only(true)
3161            .build()
3162            .unwrap();
3163
3164        let result =
3165            ro_provider.snapshot().account_history_info(address, 200, None, u64::MAX).unwrap();
3166        assert_eq!(result, HistoryInfo::InChangeset(200));
3167
3168        let result =
3169            ro_provider.snapshot().account_history_info(address, 50, None, u64::MAX).unwrap();
3170        assert_eq!(result, HistoryInfo::NotYetWritten);
3171
3172        let result =
3173            ro_provider.snapshot().account_history_info(address, 400, None, u64::MAX).unwrap();
3174        assert_eq!(result, HistoryInfo::InPlainState);
3175
3176        // Write new data via the primary.
3177        let address2 = Address::from([0x43; 20]);
3178        let chunk2 = IntegerList::new([500, 600]).unwrap();
3179        let shard_key2 = ShardedKey::new(address2, u64::MAX);
3180        rw_provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
3181
3182        // Read-only doesn't see the new data yet.
3183        let result =
3184            ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3185        assert_eq!(result, HistoryInfo::NotYetWritten);
3186
3187        // Catch up — now it sees the new data.
3188        ro_provider.try_catch_up_with_primary().unwrap();
3189
3190        let result =
3191            ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3192        assert_eq!(result, HistoryInfo::InChangeset(500));
3193    }
3194
3195    #[test]
3196    fn test_account_history_info_ignores_blocks_above_visible_tip() {
3197        let temp_dir = TempDir::new().unwrap();
3198        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3199
3200        let address = Address::from([0x42; 20]);
3201
3202        provider
3203            .put::<tables::AccountsHistory>(
3204                ShardedKey::new(address, 110),
3205                &IntegerList::new([100, 110]).unwrap(),
3206            )
3207            .unwrap();
3208        provider
3209            .put::<tables::AccountsHistory>(
3210                ShardedKey::new(address, u64::MAX),
3211                &IntegerList::new([200, 210]).unwrap(),
3212            )
3213            .unwrap();
3214
3215        let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3216        assert_eq!(result, HistoryInfo::InPlainState);
3217    }
3218
3219    #[test]
3220    fn test_account_history_info_mixed_shard_respects_visible_tip() {
3221        let temp_dir = TempDir::new().unwrap();
3222        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3223
3224        let address = Address::from([0x42; 20]);
3225        provider
3226            .put::<tables::AccountsHistory>(
3227                ShardedKey::new(address, u64::MAX),
3228                &IntegerList::new([100, 150, 300]).unwrap(),
3229            )
3230            .unwrap();
3231
3232        let result = provider.snapshot().account_history_info(address, 120, None, 200).unwrap();
3233        assert_eq!(result, HistoryInfo::InChangeset(150));
3234
3235        let result = provider.snapshot().account_history_info(address, 201, None, 200).unwrap();
3236        assert_eq!(result, HistoryInfo::InPlainState);
3237    }
3238
3239    #[test]
3240    fn test_account_history_info_only_stale_entries_use_fallback() {
3241        let temp_dir = TempDir::new().unwrap();
3242        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3243
3244        let address = Address::from([0x42; 20]);
3245        provider
3246            .put::<tables::AccountsHistory>(
3247                ShardedKey::new(address, u64::MAX),
3248                &IntegerList::new([200, 210]).unwrap(),
3249            )
3250            .unwrap();
3251
3252        let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3253        assert_eq!(result, HistoryInfo::NotYetWritten);
3254
3255        let result =
3256            provider.snapshot().account_history_info(address, 150, Some(100), 150).unwrap();
3257        assert_eq!(result, HistoryInfo::MaybeInPlainState);
3258    }
3259
3260    #[test]
3261    fn test_account_history_shard_split_at_boundary() {
3262        let temp_dir = TempDir::new().unwrap();
3263        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3264
3265        let address = Address::from([0x42; 20]);
3266        let limit = NUM_OF_INDICES_IN_SHARD;
3267
3268        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3269        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3270        let mut batch = provider.batch();
3271        batch.append_account_history_shard(address, indices).unwrap();
3272        batch.commit().unwrap();
3273
3274        // Should have 2 shards: one completed shard and one sentinel shard
3275        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3276        let sentinel_key = ShardedKey::new(address, u64::MAX);
3277
3278        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3279        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3280
3281        assert!(completed_shard.is_some(), "completed shard should exist");
3282        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3283
3284        let completed_shard = completed_shard.unwrap();
3285        let sentinel_shard = sentinel_shard.unwrap();
3286
3287        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3288        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3289    }
3290
3291    #[test]
3292    fn test_account_history_multiple_shard_splits() {
3293        let temp_dir = TempDir::new().unwrap();
3294        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3295
3296        let address = Address::from([0x43; 20]);
3297        let limit = NUM_OF_INDICES_IN_SHARD;
3298
3299        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3300        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3301        let mut batch = provider.batch();
3302        batch.append_account_history_shard(address, first_batch_indices).unwrap();
3303        batch.commit().unwrap();
3304
3305        // Should have just a sentinel shard (exactly at limit, not over)
3306        let sentinel_key = ShardedKey::new(address, u64::MAX);
3307        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3308        assert!(shard.is_some());
3309        assert_eq!(shard.unwrap().len(), limit as u64);
3310
3311        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3312        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3313        let mut batch = provider.batch();
3314        batch.append_account_history_shard(address, second_batch_indices).unwrap();
3315        batch.commit().unwrap();
3316
3317        // Now we should have: 2 completed shards + 1 sentinel shard
3318        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3319        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3320
3321        assert!(
3322            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3323            "first completed shard should exist"
3324        );
3325        assert!(
3326            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3327            "second completed shard should exist"
3328        );
3329        assert!(
3330            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3331            "sentinel shard should exist"
3332        );
3333    }
3334
3335    #[test]
3336    fn test_storage_history_shard_split_at_boundary() {
3337        let temp_dir = TempDir::new().unwrap();
3338        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3339
3340        let address = Address::from([0x44; 20]);
3341        let slot = B256::from([0x55; 32]);
3342        let limit = NUM_OF_INDICES_IN_SHARD;
3343
3344        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3345        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3346        let mut batch = provider.batch();
3347        batch.append_storage_history_shard(address, slot, indices).unwrap();
3348        batch.commit().unwrap();
3349
3350        // Should have 2 shards: one completed shard and one sentinel shard
3351        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3352        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3353
3354        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3355        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3356
3357        assert!(completed_shard.is_some(), "completed shard should exist");
3358        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3359
3360        let completed_shard = completed_shard.unwrap();
3361        let sentinel_shard = sentinel_shard.unwrap();
3362
3363        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3364        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3365    }
3366
3367    #[test]
3368    fn test_storage_history_multiple_shard_splits() {
3369        let temp_dir = TempDir::new().unwrap();
3370        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3371
3372        let address = Address::from([0x46; 20]);
3373        let slot = B256::from([0x57; 32]);
3374        let limit = NUM_OF_INDICES_IN_SHARD;
3375
3376        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3377        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3378        let mut batch = provider.batch();
3379        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3380        batch.commit().unwrap();
3381
3382        // Should have just a sentinel shard (exactly at limit, not over)
3383        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3384        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3385        assert!(shard.is_some());
3386        assert_eq!(shard.unwrap().len(), limit as u64);
3387
3388        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3389        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3390        let mut batch = provider.batch();
3391        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3392        batch.commit().unwrap();
3393
3394        // Now we should have: 2 completed shards + 1 sentinel shard
3395        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3396        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3397
3398        assert!(
3399            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3400            "first completed shard should exist"
3401        );
3402        assert!(
3403            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3404            "second completed shard should exist"
3405        );
3406        assert!(
3407            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3408            "sentinel shard should exist"
3409        );
3410    }
3411
3412    #[test]
3413    fn test_clear_table() {
3414        let temp_dir = TempDir::new().unwrap();
3415        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3416
3417        let address = Address::from([0x42; 20]);
3418        let key = ShardedKey::new(address, u64::MAX);
3419        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3420
3421        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3422        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3423
3424        provider.clear::<tables::AccountsHistory>().unwrap();
3425
3426        assert!(
3427            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3428            "table should be empty after clear"
3429        );
3430        assert!(
3431            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3432            "first() should return None after clear"
3433        );
3434    }
3435
3436    #[test]
3437    fn test_clear_empty_table() {
3438        let temp_dir = TempDir::new().unwrap();
3439        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3440
3441        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3442
3443        provider.clear::<tables::AccountsHistory>().unwrap();
3444
3445        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3446    }
3447
3448    #[test]
3449    fn test_unwind_account_history_to_basic() {
3450        let temp_dir = TempDir::new().unwrap();
3451        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3452
3453        let address = Address::from([0x42; 20]);
3454
3455        // Add blocks 0-10
3456        let mut batch = provider.batch();
3457        batch.append_account_history_shard(address, 0..=10).unwrap();
3458        batch.commit().unwrap();
3459
3460        // Verify we have blocks 0-10
3461        let key = ShardedKey::new(address, u64::MAX);
3462        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3463        assert!(result.is_some());
3464        let blocks: Vec<u64> = result.unwrap().iter().collect();
3465        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3466
3467        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
3468        let mut batch = provider.batch();
3469        batch.unwind_account_history_to(address, 5).unwrap();
3470        batch.commit().unwrap();
3471
3472        // Verify only blocks 0-5 remain
3473        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3474        assert!(result.is_some());
3475        let blocks: Vec<u64> = result.unwrap().iter().collect();
3476        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3477    }
3478
3479    #[test]
3480    fn test_unwind_account_history_to_removes_all() {
3481        let temp_dir = TempDir::new().unwrap();
3482        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3483
3484        let address = Address::from([0x42; 20]);
3485
3486        // Add blocks 5-10
3487        let mut batch = provider.batch();
3488        batch.append_account_history_shard(address, 5..=10).unwrap();
3489        batch.commit().unwrap();
3490
3491        // Unwind to block 4 (removes all blocks since they're all > 4)
3492        let mut batch = provider.batch();
3493        batch.unwind_account_history_to(address, 4).unwrap();
3494        batch.commit().unwrap();
3495
3496        // Verify no data remains for this address
3497        let key = ShardedKey::new(address, u64::MAX);
3498        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3499        assert!(result.is_none(), "Should have no data after full unwind");
3500    }
3501
3502    #[test]
3503    fn test_unwind_account_history_to_no_op() {
3504        let temp_dir = TempDir::new().unwrap();
3505        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3506
3507        let address = Address::from([0x42; 20]);
3508
3509        // Add blocks 0-5
3510        let mut batch = provider.batch();
3511        batch.append_account_history_shard(address, 0..=5).unwrap();
3512        batch.commit().unwrap();
3513
3514        // Unwind to block 10 (no-op since all blocks are <= 10)
3515        let mut batch = provider.batch();
3516        batch.unwind_account_history_to(address, 10).unwrap();
3517        batch.commit().unwrap();
3518
3519        // Verify blocks 0-5 still remain
3520        let key = ShardedKey::new(address, u64::MAX);
3521        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3522        assert!(result.is_some());
3523        let blocks: Vec<u64> = result.unwrap().iter().collect();
3524        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3525    }
3526
3527    #[test]
3528    fn test_unwind_account_history_to_block_zero() {
3529        let temp_dir = TempDir::new().unwrap();
3530        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3531
3532        let address = Address::from([0x42; 20]);
3533
3534        // Add blocks 0-5 (including block 0)
3535        let mut batch = provider.batch();
3536        batch.append_account_history_shard(address, 0..=5).unwrap();
3537        batch.commit().unwrap();
3538
3539        // Unwind to block 0 (keep only block 0, remove 1-5)
3540        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
3541        let mut batch = provider.batch();
3542        batch.unwind_account_history_to(address, 0).unwrap();
3543        batch.commit().unwrap();
3544
3545        // Verify only block 0 remains
3546        let key = ShardedKey::new(address, u64::MAX);
3547        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3548        assert!(result.is_some());
3549        let blocks: Vec<u64> = result.unwrap().iter().collect();
3550        assert_eq!(blocks, vec![0]);
3551    }
3552
3553    #[test]
3554    fn test_unwind_account_history_to_multi_shard() {
3555        let temp_dir = TempDir::new().unwrap();
3556        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3557
3558        let address = Address::from([0x42; 20]);
3559
3560        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
3561        // For testing, we'll manually create shards with specific keys
3562        let mut batch = provider.batch();
3563
3564        // First shard: blocks 1-50, keyed by 50
3565        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3566        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3567
3568        // Second shard: blocks 51-100, keyed by MAX (sentinel)
3569        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3570        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3571
3572        batch.commit().unwrap();
3573
3574        // Verify we have 2 shards
3575        let shards = provider.account_history_shards(address).unwrap();
3576        assert_eq!(shards.len(), 2);
3577
3578        // Unwind to block 75 (keep 1-75, remove 76-100)
3579        let mut batch = provider.batch();
3580        batch.unwind_account_history_to(address, 75).unwrap();
3581        batch.commit().unwrap();
3582
3583        // Verify: shard1 should be untouched, shard2 should be truncated
3584        let shards = provider.account_history_shards(address).unwrap();
3585        assert_eq!(shards.len(), 2);
3586
3587        // First shard unchanged
3588        assert_eq!(shards[0].0.highest_block_number, 50);
3589        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3590
3591        // Second shard truncated and re-keyed to MAX
3592        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3593        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3594    }
3595
3596    #[test]
3597    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3598        let temp_dir = TempDir::new().unwrap();
3599        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3600
3601        let address = Address::from([0x42; 20]);
3602
3603        // Create two shards
3604        let mut batch = provider.batch();
3605
3606        // First shard: blocks 1-50, keyed by 50
3607        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3608        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3609
3610        // Second shard: blocks 75-100, keyed by MAX
3611        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3612        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3613
3614        batch.commit().unwrap();
3615
3616        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
3617        let mut batch = provider.batch();
3618        batch.unwind_account_history_to(address, 60).unwrap();
3619        batch.commit().unwrap();
3620
3621        // Verify: only shard1 remains, now keyed as MAX
3622        let shards = provider.account_history_shards(address).unwrap();
3623        assert_eq!(shards.len(), 1);
3624        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3625        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3626    }
3627
3628    #[test]
3629    fn test_account_history_shards_iterator() {
3630        let temp_dir = TempDir::new().unwrap();
3631        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3632
3633        let address = Address::from([0x42; 20]);
3634        let other_address = Address::from([0x43; 20]);
3635
3636        // Add data for two addresses
3637        let mut batch = provider.batch();
3638        batch.append_account_history_shard(address, 0..=5).unwrap();
3639        batch.append_account_history_shard(other_address, 10..=15).unwrap();
3640        batch.commit().unwrap();
3641
3642        // Query shards for first address only
3643        let shards = provider.account_history_shards(address).unwrap();
3644        assert_eq!(shards.len(), 1);
3645        assert_eq!(shards[0].0.key, address);
3646
3647        // Query shards for second address only
3648        let shards = provider.account_history_shards(other_address).unwrap();
3649        assert_eq!(shards.len(), 1);
3650        assert_eq!(shards[0].0.key, other_address);
3651
3652        // Query shards for non-existent address
3653        let non_existent = Address::from([0x99; 20]);
3654        let shards = provider.account_history_shards(non_existent).unwrap();
3655        assert!(shards.is_empty());
3656    }
3657
3658    #[test]
3659    fn test_clear_account_history() {
3660        let temp_dir = TempDir::new().unwrap();
3661        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3662
3663        let address = Address::from([0x42; 20]);
3664
3665        // Add blocks 0-10
3666        let mut batch = provider.batch();
3667        batch.append_account_history_shard(address, 0..=10).unwrap();
3668        batch.commit().unwrap();
3669
3670        // Clear all history (simulates unwind from block 0)
3671        let mut batch = provider.batch();
3672        batch.clear_account_history(address).unwrap();
3673        batch.commit().unwrap();
3674
3675        // Verify no data remains
3676        let shards = provider.account_history_shards(address).unwrap();
3677        assert!(shards.is_empty(), "All shards should be deleted");
3678    }
3679
3680    #[test]
3681    fn test_unwind_non_sentinel_boundary() {
3682        let temp_dir = TempDir::new().unwrap();
3683        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3684
3685        let address = Address::from([0x42; 20]);
3686
3687        // Create three shards with non-sentinel boundary
3688        let mut batch = provider.batch();
3689
3690        // Shard 1: blocks 1-50, keyed by 50
3691        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3692        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3693
3694        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
3695        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3696        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3697
3698        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
3699        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3700        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3701
3702        batch.commit().unwrap();
3703
3704        // Verify 3 shards
3705        let shards = provider.account_history_shards(address).unwrap();
3706        assert_eq!(shards.len(), 3);
3707
3708        // Unwind to block 75 (truncates shard2, deletes shard3)
3709        let mut batch = provider.batch();
3710        batch.unwind_account_history_to(address, 75).unwrap();
3711        batch.commit().unwrap();
3712
3713        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
3714        let shards = provider.account_history_shards(address).unwrap();
3715        assert_eq!(shards.len(), 2);
3716
3717        // First shard unchanged
3718        assert_eq!(shards[0].0.highest_block_number, 50);
3719        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3720
3721        // Second shard truncated and re-keyed to MAX
3722        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3723        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3724    }
3725
3726    #[test]
3727    fn test_batch_auto_commit_on_threshold() {
3728        let temp_dir = TempDir::new().unwrap();
3729        let provider =
3730            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3731
3732        // Create batch with tiny threshold (1KB) to force auto-commits
3733        let mut batch = RocksDBBatch {
3734            provider: &provider,
3735            inner: WriteBatchWithTransaction::<true>::default(),
3736            buf: Vec::new(),
3737            auto_commit_threshold: Some(1024), // 1KB
3738        };
3739
3740        // Write entries until we exceed threshold multiple times
3741        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
3742        for i in 0..100u64 {
3743            let value = format!("value_{i:04}").into_bytes();
3744            batch.put::<TestTable>(i, &value).unwrap();
3745        }
3746
3747        // Data should already be visible (auto-committed) even before final commit
3748        // At least some entries should be readable
3749        let first_visible = provider.get::<TestTable>(0).unwrap();
3750        assert!(first_visible.is_some(), "Auto-committed data should be visible");
3751
3752        // Final commit for remaining batch
3753        batch.commit().unwrap();
3754
3755        // All entries should now be visible
3756        for i in 0..100u64 {
3757            let value = format!("value_{i:04}").into_bytes();
3758            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3759        }
3760    }
3761
3762    // ==================== PARAMETERIZED PRUNE TESTS ====================
3763
3764    /// Test case for account history pruning
3765    struct AccountPruneCase {
3766        name: &'static str,
3767        initial_shards: &'static [(u64, &'static [u64])],
3768        prune_to: u64,
3769        expected_outcome: PruneShardOutcome,
3770        expected_shards: &'static [(u64, &'static [u64])],
3771    }
3772
3773    /// Test case for storage history pruning
3774    struct StoragePruneCase {
3775        name: &'static str,
3776        initial_shards: &'static [(u64, &'static [u64])],
3777        prune_to: u64,
3778        expected_outcome: PruneShardOutcome,
3779        expected_shards: &'static [(u64, &'static [u64])],
3780    }
3781
3782    #[test]
3783    fn test_prune_account_history_cases() {
3784        const MAX: u64 = u64::MAX;
3785        const CASES: &[AccountPruneCase] = &[
3786            AccountPruneCase {
3787                name: "single_shard_truncate",
3788                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3789                prune_to: 25,
3790                expected_outcome: PruneShardOutcome::Updated,
3791                expected_shards: &[(MAX, &[30, 40])],
3792            },
3793            AccountPruneCase {
3794                name: "single_shard_delete_all",
3795                initial_shards: &[(MAX, &[10, 20])],
3796                prune_to: 20,
3797                expected_outcome: PruneShardOutcome::Deleted,
3798                expected_shards: &[],
3799            },
3800            AccountPruneCase {
3801                name: "single_shard_noop",
3802                initial_shards: &[(MAX, &[10, 20])],
3803                prune_to: 5,
3804                expected_outcome: PruneShardOutcome::Unchanged,
3805                expected_shards: &[(MAX, &[10, 20])],
3806            },
3807            AccountPruneCase {
3808                name: "no_shards",
3809                initial_shards: &[],
3810                prune_to: 100,
3811                expected_outcome: PruneShardOutcome::Unchanged,
3812                expected_shards: &[],
3813            },
3814            AccountPruneCase {
3815                name: "multi_shard_truncate_first",
3816                initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3817                prune_to: 25,
3818                expected_outcome: PruneShardOutcome::Updated,
3819                expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3820            },
3821            AccountPruneCase {
3822                name: "delete_first_shard_sentinel_unchanged",
3823                initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3824                prune_to: 20,
3825                expected_outcome: PruneShardOutcome::Deleted,
3826                expected_shards: &[(MAX, &[30, 40])],
3827            },
3828            AccountPruneCase {
3829                name: "multi_shard_delete_all_but_last",
3830                initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3831                prune_to: 22,
3832                expected_outcome: PruneShardOutcome::Deleted,
3833                expected_shards: &[(MAX, &[25, 30])],
3834            },
3835            AccountPruneCase {
3836                name: "mid_shard_preserves_key",
3837                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3838                prune_to: 25,
3839                expected_outcome: PruneShardOutcome::Updated,
3840                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3841            },
3842            // Equivalence tests
3843            AccountPruneCase {
3844                name: "equiv_delete_early_shards_keep_sentinel",
3845                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3846                prune_to: 55,
3847                expected_outcome: PruneShardOutcome::Deleted,
3848                expected_shards: &[(MAX, &[60, 70])],
3849            },
3850            AccountPruneCase {
3851                name: "equiv_sentinel_becomes_empty_with_prev",
3852                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3853                prune_to: 40,
3854                expected_outcome: PruneShardOutcome::Deleted,
3855                expected_shards: &[(MAX, &[50])],
3856            },
3857            AccountPruneCase {
3858                name: "equiv_all_shards_become_empty",
3859                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3860                prune_to: 51,
3861                expected_outcome: PruneShardOutcome::Deleted,
3862                expected_shards: &[],
3863            },
3864            AccountPruneCase {
3865                name: "equiv_non_sentinel_last_shard_promoted",
3866                initial_shards: &[(100, &[50, 75, 100])],
3867                prune_to: 60,
3868                expected_outcome: PruneShardOutcome::Updated,
3869                expected_shards: &[(MAX, &[75, 100])],
3870            },
3871            AccountPruneCase {
3872                name: "equiv_filter_within_shard",
3873                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3874                prune_to: 25,
3875                expected_outcome: PruneShardOutcome::Updated,
3876                expected_shards: &[(MAX, &[30, 40])],
3877            },
3878            AccountPruneCase {
3879                name: "equiv_multi_shard_partial_delete",
3880                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3881                prune_to: 35,
3882                expected_outcome: PruneShardOutcome::Deleted,
3883                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3884            },
3885        ];
3886
3887        let address = Address::from([0x42; 20]);
3888
3889        for case in CASES {
3890            let temp_dir = TempDir::new().unwrap();
3891            let provider =
3892                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3893
3894            // Setup initial shards
3895            let mut batch = provider.batch();
3896            for (highest, blocks) in case.initial_shards {
3897                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3898                batch
3899                    .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3900                    .unwrap();
3901            }
3902            batch.commit().unwrap();
3903
3904            // Prune
3905            let mut batch = provider.batch();
3906            let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3907            batch.commit().unwrap();
3908
3909            // Assert outcome
3910            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3911
3912            // Assert final shards
3913            let shards = provider.account_history_shards(address).unwrap();
3914            assert_eq!(
3915                shards.len(),
3916                case.expected_shards.len(),
3917                "case '{}': wrong shard count",
3918                case.name
3919            );
3920            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3921                shards.iter().zip(case.expected_shards.iter()).enumerate()
3922            {
3923                assert_eq!(
3924                    key.highest_block_number, *exp_key,
3925                    "case '{}': shard {} wrong key",
3926                    case.name, i
3927                );
3928                assert_eq!(
3929                    blocks.iter().collect::<Vec<_>>(),
3930                    *exp_blocks,
3931                    "case '{}': shard {} wrong blocks",
3932                    case.name,
3933                    i
3934                );
3935            }
3936        }
3937    }
3938
3939    #[test]
3940    fn test_prune_storage_history_cases() {
3941        const MAX: u64 = u64::MAX;
3942        const CASES: &[StoragePruneCase] = &[
3943            StoragePruneCase {
3944                name: "single_shard_truncate",
3945                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3946                prune_to: 25,
3947                expected_outcome: PruneShardOutcome::Updated,
3948                expected_shards: &[(MAX, &[30, 40])],
3949            },
3950            StoragePruneCase {
3951                name: "single_shard_delete_all",
3952                initial_shards: &[(MAX, &[10, 20])],
3953                prune_to: 20,
3954                expected_outcome: PruneShardOutcome::Deleted,
3955                expected_shards: &[],
3956            },
3957            StoragePruneCase {
3958                name: "noop",
3959                initial_shards: &[(MAX, &[10, 20])],
3960                prune_to: 5,
3961                expected_outcome: PruneShardOutcome::Unchanged,
3962                expected_shards: &[(MAX, &[10, 20])],
3963            },
3964            StoragePruneCase {
3965                name: "no_shards",
3966                initial_shards: &[],
3967                prune_to: 100,
3968                expected_outcome: PruneShardOutcome::Unchanged,
3969                expected_shards: &[],
3970            },
3971            StoragePruneCase {
3972                name: "mid_shard_preserves_key",
3973                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3974                prune_to: 25,
3975                expected_outcome: PruneShardOutcome::Updated,
3976                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3977            },
3978            // Equivalence tests
3979            StoragePruneCase {
3980                name: "equiv_sentinel_promotion",
3981                initial_shards: &[(100, &[50, 75, 100])],
3982                prune_to: 60,
3983                expected_outcome: PruneShardOutcome::Updated,
3984                expected_shards: &[(MAX, &[75, 100])],
3985            },
3986            StoragePruneCase {
3987                name: "equiv_delete_early_shards_keep_sentinel",
3988                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3989                prune_to: 55,
3990                expected_outcome: PruneShardOutcome::Deleted,
3991                expected_shards: &[(MAX, &[60, 70])],
3992            },
3993            StoragePruneCase {
3994                name: "equiv_sentinel_becomes_empty_with_prev",
3995                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3996                prune_to: 40,
3997                expected_outcome: PruneShardOutcome::Deleted,
3998                expected_shards: &[(MAX, &[50])],
3999            },
4000            StoragePruneCase {
4001                name: "equiv_all_shards_become_empty",
4002                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
4003                prune_to: 51,
4004                expected_outcome: PruneShardOutcome::Deleted,
4005                expected_shards: &[],
4006            },
4007            StoragePruneCase {
4008                name: "equiv_filter_within_shard",
4009                initial_shards: &[(MAX, &[10, 20, 30, 40])],
4010                prune_to: 25,
4011                expected_outcome: PruneShardOutcome::Updated,
4012                expected_shards: &[(MAX, &[30, 40])],
4013            },
4014            StoragePruneCase {
4015                name: "equiv_multi_shard_partial_delete",
4016                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
4017                prune_to: 35,
4018                expected_outcome: PruneShardOutcome::Deleted,
4019                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
4020            },
4021        ];
4022
4023        let address = Address::from([0x42; 20]);
4024        let storage_key = B256::from([0x01; 32]);
4025
4026        for case in CASES {
4027            let temp_dir = TempDir::new().unwrap();
4028            let provider =
4029                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4030
4031            // Setup initial shards
4032            let mut batch = provider.batch();
4033            for (highest, blocks) in case.initial_shards {
4034                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4035                let key = if *highest == MAX {
4036                    StorageShardedKey::last(address, storage_key)
4037                } else {
4038                    StorageShardedKey::new(address, storage_key, *highest)
4039                };
4040                batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4041            }
4042            batch.commit().unwrap();
4043
4044            // Prune
4045            let mut batch = provider.batch();
4046            let outcome =
4047                batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
4048            batch.commit().unwrap();
4049
4050            // Assert outcome
4051            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
4052
4053            // Assert final shards
4054            let shards = provider.storage_history_shards(address, storage_key).unwrap();
4055            assert_eq!(
4056                shards.len(),
4057                case.expected_shards.len(),
4058                "case '{}': wrong shard count",
4059                case.name
4060            );
4061            for (i, ((key, blocks), (exp_key, exp_blocks))) in
4062                shards.iter().zip(case.expected_shards.iter()).enumerate()
4063            {
4064                assert_eq!(
4065                    key.sharded_key.highest_block_number, *exp_key,
4066                    "case '{}': shard {} wrong key",
4067                    case.name, i
4068                );
4069                assert_eq!(
4070                    blocks.iter().collect::<Vec<_>>(),
4071                    *exp_blocks,
4072                    "case '{}': shard {} wrong blocks",
4073                    case.name,
4074                    i
4075                );
4076            }
4077        }
4078    }
4079
4080    #[test]
4081    fn test_prune_storage_history_does_not_affect_other_slots() {
4082        let temp_dir = TempDir::new().unwrap();
4083        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4084
4085        let address = Address::from([0x42; 20]);
4086        let slot1 = B256::from([0x01; 32]);
4087        let slot2 = B256::from([0x02; 32]);
4088
4089        // Two different storage slots
4090        let mut batch = provider.batch();
4091        batch
4092            .put::<tables::StoragesHistory>(
4093                StorageShardedKey::last(address, slot1),
4094                &BlockNumberList::new_pre_sorted([10u64, 20]),
4095            )
4096            .unwrap();
4097        batch
4098            .put::<tables::StoragesHistory>(
4099                StorageShardedKey::last(address, slot2),
4100                &BlockNumberList::new_pre_sorted([30u64, 40]),
4101            )
4102            .unwrap();
4103        batch.commit().unwrap();
4104
4105        // Prune slot1 to block 20 (deletes all)
4106        let mut batch = provider.batch();
4107        let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
4108        batch.commit().unwrap();
4109
4110        assert_eq!(outcome, PruneShardOutcome::Deleted);
4111
4112        // slot1 should be empty
4113        let shards1 = provider.storage_history_shards(address, slot1).unwrap();
4114        assert!(shards1.is_empty());
4115
4116        // slot2 should be unchanged
4117        let shards2 = provider.storage_history_shards(address, slot2).unwrap();
4118        assert_eq!(shards2.len(), 1);
4119        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
4120    }
4121
4122    #[test]
4123    fn test_prune_invariants() {
4124        // Test invariants: no empty shards, sentinel is always last
4125        let address = Address::from([0x42; 20]);
4126        let storage_key = B256::from([0x01; 32]);
4127
4128        // Test cases that exercise invariants
4129        #[expect(clippy::type_complexity)]
4130        let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
4131            // Account: shards where middle becomes empty
4132            (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
4133            // Account: non-sentinel shard only, partial prune -> must become sentinel
4134            (&[(100, &[50, 100])], 60),
4135        ];
4136
4137        for (initial_shards, prune_to) in invariant_cases {
4138            // Test account history invariants
4139            {
4140                let temp_dir = TempDir::new().unwrap();
4141                let provider =
4142                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4143
4144                let mut batch = provider.batch();
4145                for (highest, blocks) in *initial_shards {
4146                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4147                    batch
4148                        .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
4149                        .unwrap();
4150                }
4151                batch.commit().unwrap();
4152
4153                let mut batch = provider.batch();
4154                batch.prune_account_history_to(address, *prune_to).unwrap();
4155                batch.commit().unwrap();
4156
4157                let shards = provider.account_history_shards(address).unwrap();
4158
4159                // Invariant 1: no empty shards
4160                for (key, blocks) in &shards {
4161                    assert!(
4162                        !blocks.is_empty(),
4163                        "Account: empty shard at key {}",
4164                        key.highest_block_number
4165                    );
4166                }
4167
4168                // Invariant 2: last shard is sentinel
4169                if !shards.is_empty() {
4170                    let last = shards.last().unwrap();
4171                    assert_eq!(
4172                        last.0.highest_block_number,
4173                        u64::MAX,
4174                        "Account: last shard must be sentinel"
4175                    );
4176                }
4177            }
4178
4179            // Test storage history invariants
4180            {
4181                let temp_dir = TempDir::new().unwrap();
4182                let provider =
4183                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4184
4185                let mut batch = provider.batch();
4186                for (highest, blocks) in *initial_shards {
4187                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4188                    let key = if *highest == u64::MAX {
4189                        StorageShardedKey::last(address, storage_key)
4190                    } else {
4191                        StorageShardedKey::new(address, storage_key, *highest)
4192                    };
4193                    batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4194                }
4195                batch.commit().unwrap();
4196
4197                let mut batch = provider.batch();
4198                batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
4199                batch.commit().unwrap();
4200
4201                let shards = provider.storage_history_shards(address, storage_key).unwrap();
4202
4203                // Invariant 1: no empty shards
4204                for (key, blocks) in &shards {
4205                    assert!(
4206                        !blocks.is_empty(),
4207                        "Storage: empty shard at key {}",
4208                        key.sharded_key.highest_block_number
4209                    );
4210                }
4211
4212                // Invariant 2: last shard is sentinel
4213                if !shards.is_empty() {
4214                    let last = shards.last().unwrap();
4215                    assert_eq!(
4216                        last.0.sharded_key.highest_block_number,
4217                        u64::MAX,
4218                        "Storage: last shard must be sentinel"
4219                    );
4220                }
4221            }
4222        }
4223    }
4224
4225    #[test]
4226    fn test_prune_account_history_batch_multiple_sorted_targets() {
4227        let temp_dir = TempDir::new().unwrap();
4228        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4229
4230        let addr1 = Address::from([0x01; 20]);
4231        let addr2 = Address::from([0x02; 20]);
4232        let addr3 = Address::from([0x03; 20]);
4233
4234        // Setup shards for each address
4235        let mut batch = provider.batch();
4236        batch
4237            .put::<tables::AccountsHistory>(
4238                ShardedKey::new(addr1, u64::MAX),
4239                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4240            )
4241            .unwrap();
4242        batch
4243            .put::<tables::AccountsHistory>(
4244                ShardedKey::new(addr2, u64::MAX),
4245                &BlockNumberList::new_pre_sorted([5, 10, 15]),
4246            )
4247            .unwrap();
4248        batch
4249            .put::<tables::AccountsHistory>(
4250                ShardedKey::new(addr3, u64::MAX),
4251                &BlockNumberList::new_pre_sorted([100, 200]),
4252            )
4253            .unwrap();
4254        batch.commit().unwrap();
4255
4256        // Prune all three (sorted by address)
4257        let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4258        targets.sort_by_key(|(addr, _)| *addr);
4259
4260        let mut batch = provider.batch();
4261        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4262        batch.commit().unwrap();
4263
4264        // addr1: prune <=15, keep [20, 30] -> updated
4265        // addr2: prune <=10, keep [15] -> updated
4266        // addr3: prune <=50, keep [100, 200] -> unchanged
4267        assert_eq!(outcomes.updated, 2);
4268        assert_eq!(outcomes.unchanged, 1);
4269
4270        let shards1 = provider.account_history_shards(addr1).unwrap();
4271        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4272
4273        let shards2 = provider.account_history_shards(addr2).unwrap();
4274        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4275
4276        let shards3 = provider.account_history_shards(addr3).unwrap();
4277        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4278    }
4279
4280    #[test]
4281    fn test_prune_account_history_batch_target_with_no_shards() {
4282        let temp_dir = TempDir::new().unwrap();
4283        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4284
4285        let addr1 = Address::from([0x01; 20]);
4286        let addr2 = Address::from([0x02; 20]); // No shards for this one
4287        let addr3 = Address::from([0x03; 20]);
4288
4289        // Only setup shards for addr1 and addr3
4290        let mut batch = provider.batch();
4291        batch
4292            .put::<tables::AccountsHistory>(
4293                ShardedKey::new(addr1, u64::MAX),
4294                &BlockNumberList::new_pre_sorted([10, 20]),
4295            )
4296            .unwrap();
4297        batch
4298            .put::<tables::AccountsHistory>(
4299                ShardedKey::new(addr3, u64::MAX),
4300                &BlockNumberList::new_pre_sorted([30, 40]),
4301            )
4302            .unwrap();
4303        batch.commit().unwrap();
4304
4305        // Prune all three (addr2 has no shards - tests p > target_prefix case)
4306        let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4307        targets.sort_by_key(|(addr, _)| *addr);
4308
4309        let mut batch = provider.batch();
4310        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4311        batch.commit().unwrap();
4312
4313        // addr1: updated (keep [20])
4314        // addr2: unchanged (no shards)
4315        // addr3: updated (keep [40])
4316        assert_eq!(outcomes.updated, 2);
4317        assert_eq!(outcomes.unchanged, 1);
4318
4319        let shards1 = provider.account_history_shards(addr1).unwrap();
4320        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4321
4322        let shards3 = provider.account_history_shards(addr3).unwrap();
4323        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4324    }
4325
4326    #[test]
4327    fn test_prune_storage_history_batch_multiple_sorted_targets() {
4328        let temp_dir = TempDir::new().unwrap();
4329        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4330
4331        let addr = Address::from([0x42; 20]);
4332        let slot1 = B256::from([0x01; 32]);
4333        let slot2 = B256::from([0x02; 32]);
4334
4335        // Setup shards
4336        let mut batch = provider.batch();
4337        batch
4338            .put::<tables::StoragesHistory>(
4339                StorageShardedKey::new(addr, slot1, u64::MAX),
4340                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4341            )
4342            .unwrap();
4343        batch
4344            .put::<tables::StoragesHistory>(
4345                StorageShardedKey::new(addr, slot2, u64::MAX),
4346                &BlockNumberList::new_pre_sorted([5, 15, 25]),
4347            )
4348            .unwrap();
4349        batch.commit().unwrap();
4350
4351        // Prune both (sorted)
4352        let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4353        targets.sort_by_key(|((a, s), _)| (*a, *s));
4354
4355        let mut batch = provider.batch();
4356        let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4357        batch.commit().unwrap();
4358
4359        assert_eq!(outcomes.updated, 2);
4360
4361        let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4362        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4363
4364        let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4365        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4366    }
4367}