Skip to main content

reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{
5    map::{AddressMap, HashMap},
6    Address, BlockNumber, TxNumber, B256,
7};
8use itertools::Itertools;
9use metrics::Label;
10use parking_lot::Mutex;
11use reth_chain_state::ExecutedBlock;
12use reth_db_api::{
13    database_metrics::DatabaseMetrics,
14    models::{
15        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
16        StorageSettings,
17    },
18    table::{Compress, Decode, Decompress, Encode, Table},
19    tables, BlockNumberList, DatabaseError,
20};
21use reth_primitives_traits::{BlockBody as _, FastInstant as Instant};
22use reth_prune_types::PruneMode;
23use reth_storage_errors::{
24    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
25    provider::{ProviderError, ProviderResult},
26};
27use rocksdb::{
28    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
29    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
30    OptimisticTransactionOptions, Options, SnapshotWithThreadMode, Transaction,
31    WriteBatchWithTransaction, WriteOptions, DB,
32};
33use std::{
34    collections::BTreeMap,
35    fmt,
36    path::{Path, PathBuf},
37    sync::Arc,
38};
39use tracing::instrument;
40
41/// Pending `RocksDB` batches type alias.
42pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
43
44/// Raw key-value result from a `RocksDB` iterator.
45type RawKVResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
46
47/// Statistics for a single `RocksDB` table (column family).
48#[derive(Debug, Clone)]
49pub struct RocksDBTableStats {
50    /// Size of SST files on disk in bytes.
51    pub sst_size_bytes: u64,
52    /// Size of memtables in memory in bytes.
53    pub memtable_size_bytes: u64,
54    /// Name of the table/column family.
55    pub name: String,
56    /// Estimated number of keys in the table.
57    pub estimated_num_keys: u64,
58    /// Estimated size of live data in bytes (SST files + memtables).
59    pub estimated_size_bytes: u64,
60    /// Estimated bytes pending compaction (reclaimable space).
61    pub pending_compaction_bytes: u64,
62}
63
64/// Database-level statistics for `RocksDB`.
65///
66/// Contains both per-table statistics and DB-level metrics like WAL size.
67#[derive(Debug, Clone)]
68pub struct RocksDBStats {
69    /// Statistics for each table (column family).
70    pub tables: Vec<RocksDBTableStats>,
71    /// Total size of WAL (Write-Ahead Log) files in bytes.
72    ///
73    /// WAL is shared across all tables and not included in per-table metrics.
74    pub wal_size_bytes: u64,
75}
76
77/// Context for `RocksDB` block writes.
78#[derive(Clone)]
79pub(crate) struct RocksDBWriteCtx {
80    /// The first block number being written.
81    pub first_block_number: BlockNumber,
82    /// The prune mode for transaction lookup, if any.
83    pub prune_tx_lookup: Option<PruneMode>,
84    /// Storage settings determining what goes to `RocksDB`.
85    pub storage_settings: StorageSettings,
86    /// Pending batches to push to after writing.
87    pub pending_batches: PendingRocksDBBatches,
88}
89
90impl fmt::Debug for RocksDBWriteCtx {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("RocksDBWriteCtx")
93            .field("first_block_number", &self.first_block_number)
94            .field("prune_tx_lookup", &self.prune_tx_lookup)
95            .field("storage_settings", &self.storage_settings)
96            .field("pending_batches", &"<pending batches>")
97            .finish()
98    }
99}
100
101/// Default cache size for `RocksDB` block cache (128 MB).
102const DEFAULT_CACHE_SIZE: usize = 128 << 20;
103
104/// Default block size for `RocksDB` tables (16 KB).
105const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
106
107/// Default max background jobs for `RocksDB` compaction and flushing.
108const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
109
110/// Default max open file descriptors for `RocksDB`.
111///
112/// Caps the number of SST file handles `RocksDB` keeps open simultaneously.
113/// Set to 512 to stay within the common default OS `ulimit -n` of 1024,
114/// leaving headroom for MDBX, static files, and other I/O.
115/// `RocksDB` uses an internal table cache and re-opens files on demand,
116/// so this has negligible performance impact on read-heavy workloads.
117const DEFAULT_MAX_OPEN_FILES: i32 = 512;
118
119/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
120const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
121
122/// Default write buffer size for `RocksDB` memtables (128 MB).
123///
124/// Larger memtables reduce flush frequency during burst writes, providing more consistent
125/// tail latency. Benchmarks showed 128 MB reduces p99 latency variance by ~80% compared
126/// to 64 MB default, with negligible impact on mean throughput.
127const DEFAULT_WRITE_BUFFER_SIZE: usize = 128 << 20;
128
129/// Default buffer capacity for compression in batches.
130/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
131/// reducing the first few reallocations without over-allocating.
132const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
133
134/// Default auto-commit threshold for batch writes (4 GiB).
135///
136/// When a batch exceeds this size, it is automatically committed to prevent OOM
137/// during large bulk writes. The consistency check on startup heals any crash
138/// that occurs between auto-commits.
139const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
140
141/// Builder for [`RocksDBProvider`].
142pub struct RocksDBBuilder {
143    path: PathBuf,
144    column_families: Vec<String>,
145    enable_metrics: bool,
146    enable_statistics: bool,
147    log_level: rocksdb::LogLevel,
148    block_cache: Cache,
149    read_only: bool,
150}
151
152impl fmt::Debug for RocksDBBuilder {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        f.debug_struct("RocksDBBuilder")
155            .field("path", &self.path)
156            .field("column_families", &self.column_families)
157            .field("enable_metrics", &self.enable_metrics)
158            .finish()
159    }
160}
161
162impl RocksDBBuilder {
163    /// Creates a new builder with optimized default options.
164    pub fn new(path: impl AsRef<Path>) -> Self {
165        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
166        Self {
167            path: path.as_ref().to_path_buf(),
168            column_families: Vec::new(),
169            enable_metrics: false,
170            enable_statistics: false,
171            log_level: rocksdb::LogLevel::Info,
172            block_cache: cache,
173            read_only: false,
174        }
175    }
176
177    /// Creates default table options with shared block cache.
178    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
179        let mut table_options = BlockBasedOptions::default();
180        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
181        table_options.set_cache_index_and_filter_blocks(true);
182        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
183        // Shared block cache for all column families.
184        table_options.set_block_cache(cache);
185        table_options
186    }
187
188    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
189    fn default_options(
190        log_level: rocksdb::LogLevel,
191        cache: &Cache,
192        enable_statistics: bool,
193    ) -> Options {
194        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
195        let table_options = Self::default_table_options(cache);
196
197        let mut options = Options::default();
198        options.set_block_based_table_factory(&table_options);
199        options.create_if_missing(true);
200        options.create_missing_column_families(true);
201        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
202        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
203
204        options.set_bottommost_compression_type(DBCompressionType::Zstd);
205        options.set_bottommost_zstd_max_train_bytes(0, true);
206        options.set_compression_type(DBCompressionType::Lz4);
207        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
208
209        options.set_log_level(log_level);
210
211        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
212
213        // Delete obsolete WAL files immediately after all column families have flushed.
214        // Both set to 0 means "delete ASAP, no archival".
215        options.set_wal_ttl_seconds(0);
216        options.set_wal_size_limit_mb(0);
217
218        // Statistics can view from RocksDB log file
219        if enable_statistics {
220            options.enable_statistics();
221        }
222
223        options
224    }
225
226    /// Creates optimized column family options.
227    fn default_column_family_options(cache: &Cache) -> Options {
228        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
229        let table_options = Self::default_table_options(cache);
230
231        let mut cf_options = Options::default();
232        cf_options.set_block_based_table_factory(&table_options);
233        cf_options.set_level_compaction_dynamic_level_bytes(true);
234        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
235        cf_options.set_compression_type(DBCompressionType::Lz4);
236        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
237        // Only use Zstd compression, disable dictionary training
238        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
239        cf_options.set_write_buffer_size(DEFAULT_WRITE_BUFFER_SIZE);
240
241        cf_options
242    }
243
244    /// Creates optimized column family options for `TransactionHashNumbers`.
245    ///
246    /// This table stores `B256 -> TxNumber` mappings where:
247    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
248    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
249    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
250    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
251        let mut table_options = BlockBasedOptions::default();
252        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
253        table_options.set_cache_index_and_filter_blocks(true);
254        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
255        table_options.set_block_cache(cache);
256        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
257        // and waste memory
258
259        let mut cf_options = Options::default();
260        cf_options.set_block_based_table_factory(&table_options);
261        cf_options.set_level_compaction_dynamic_level_bytes(true);
262        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
263        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
264        cf_options.set_compression_type(DBCompressionType::None);
265        cf_options.set_bottommost_compression_type(DBCompressionType::None);
266
267        cf_options
268    }
269
270    /// Adds a column family for a specific table type.
271    pub fn with_table<T: Table>(mut self) -> Self {
272        self.column_families.push(T::NAME.to_string());
273        self
274    }
275
276    /// Registers the default tables used by reth for `RocksDB` storage.
277    ///
278    /// This registers:
279    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
280    /// - [`tables::AccountsHistory`] - Account history index
281    /// - [`tables::StoragesHistory`] - Storage history index
282    pub fn with_default_tables(self) -> Self {
283        self.with_table::<tables::TransactionHashNumbers>()
284            .with_table::<tables::AccountsHistory>()
285            .with_table::<tables::StoragesHistory>()
286    }
287
288    /// Enables metrics.
289    pub const fn with_metrics(mut self) -> Self {
290        self.enable_metrics = true;
291        self
292    }
293
294    /// Enables `RocksDB` internal statistics collection.
295    pub const fn with_statistics(mut self) -> Self {
296        self.enable_statistics = true;
297        self
298    }
299
300    /// Sets the log level from `DatabaseArgs` configuration.
301    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
302        if let Some(level) = log_level {
303            self.log_level = convert_log_level(level);
304        }
305        self
306    }
307
308    /// Sets a custom block cache size.
309    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
310        self.block_cache = Cache::new_lru_cache(capacity_bytes);
311        self
312    }
313
314    /// Sets read-only mode.
315    ///
316    /// Opens the database as a secondary instance, which supports catching up
317    /// with the primary via [`RocksDBProvider::try_catch_up_with_primary`].
318    /// A temporary directory is created automatically for the secondary's LOG files.
319    ///
320    /// Note: Write operations on a read-only provider will panic at runtime.
321    pub const fn with_read_only(mut self, read_only: bool) -> Self {
322        self.read_only = read_only;
323        self
324    }
325
326    /// Builds the [`RocksDBProvider`].
327    pub fn build(self) -> ProviderResult<RocksDBProvider> {
328        let options =
329            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
330
331        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
332            .column_families
333            .iter()
334            .map(|name| {
335                let cf_options = if name == tables::TransactionHashNumbers::NAME {
336                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
337                } else {
338                    Self::default_column_family_options(&self.block_cache)
339                };
340                ColumnFamilyDescriptor::new(name.clone(), cf_options)
341            })
342            .collect();
343
344        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
345
346        if self.read_only {
347            // Open as secondary instance for catch-up capability.
348            // Secondary needs max_open_files = -1 to keep all FDs open.
349            let mut options = options;
350            options.set_max_open_files(-1);
351
352            let secondary_path = self
353                .path
354                .parent()
355                .unwrap_or(&self.path)
356                .join(format!("rocksdb-secondary-tmp-{}", std::process::id()));
357            reth_fs_util::create_dir_all(&secondary_path).map_err(ProviderError::other)?;
358
359            let db = DB::open_cf_descriptors_as_secondary(
360                &options,
361                &self.path,
362                &secondary_path,
363                cf_descriptors,
364            )
365            .map_err(|e| {
366                ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
367                    message: e.to_string().into(),
368                    code: -1,
369                }))
370            })?;
371            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::Secondary {
372                db,
373                metrics,
374                secondary_path,
375            })))
376        } else {
377            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
378            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
379            // detection at commit) and is backed by DBCommon, giving us access to
380            // cancel_all_background_work for clean shutdown.
381            let db =
382                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
383                    .map_err(|e| {
384                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
385                            message: e.to_string().into(),
386                            code: -1,
387                        }))
388                    })?;
389            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
390        }
391    }
392}
393
394/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
395/// allocated buffer when we can just use their reference.
396macro_rules! compress_to_buf_or_ref {
397    ($buf:expr, $value:expr) => {
398        if let Some(value) = $value.uncompressable_ref() {
399            Some(value)
400        } else {
401            $buf.clear();
402            $value.compress_to_buf(&mut $buf);
403            None
404        }
405    };
406}
407
408/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
409#[derive(Debug)]
410pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
411
412/// Inner state for `RocksDB` provider.
413enum RocksDBProviderInner {
414    /// Read-write mode using `OptimisticTransactionDB`.
415    ReadWrite {
416        /// `RocksDB` database instance with optimistic transaction support.
417        db: OptimisticTransactionDB,
418        /// Metrics latency & operations.
419        metrics: Option<RocksDBMetrics>,
420    },
421    /// Secondary mode using `DB` opened with `open_cf_descriptors_as_secondary`.
422    /// Supports catching up with the primary via `try_catch_up_with_primary`.
423    /// Does not support snapshots; consistency is guaranteed externally.
424    Secondary {
425        /// Secondary `RocksDB` database instance.
426        db: DB,
427        /// Metrics latency & operations.
428        metrics: Option<RocksDBMetrics>,
429        /// Temporary directory for secondary LOG files, removed on drop.
430        secondary_path: PathBuf,
431    },
432}
433
434impl RocksDBProviderInner {
435    /// Returns the metrics for this provider.
436    const fn metrics(&self) -> Option<&RocksDBMetrics> {
437        match self {
438            Self::ReadWrite { metrics, .. } | Self::Secondary { metrics, .. } => metrics.as_ref(),
439        }
440    }
441
442    /// Returns the read-write database, panicking if in read-only mode.
443    fn db_rw(&self) -> &OptimisticTransactionDB {
444        match self {
445            Self::ReadWrite { db, .. } => db,
446            Self::Secondary { .. } => {
447                panic!("Cannot perform write operation on secondary RocksDB provider")
448            }
449        }
450    }
451
452    /// Gets the column family handle for a table.
453    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
454        let cf = match self {
455            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
456            Self::Secondary { db, .. } => db.cf_handle(T::NAME),
457        };
458        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
459    }
460
461    /// Gets a value from a column family.
462    fn get_cf(
463        &self,
464        cf: &rocksdb::ColumnFamily,
465        key: impl AsRef<[u8]>,
466    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
467        match self {
468            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
469            Self::Secondary { db, .. } => db.get_cf(cf, key),
470        }
471    }
472
473    /// Puts a value into a column family.
474    fn put_cf(
475        &self,
476        cf: &rocksdb::ColumnFamily,
477        key: impl AsRef<[u8]>,
478        value: impl AsRef<[u8]>,
479    ) -> Result<(), rocksdb::Error> {
480        self.db_rw().put_cf(cf, key, value)
481    }
482
483    /// Deletes a value from a column family.
484    fn delete_cf(
485        &self,
486        cf: &rocksdb::ColumnFamily,
487        key: impl AsRef<[u8]>,
488    ) -> Result<(), rocksdb::Error> {
489        self.db_rw().delete_cf(cf, key)
490    }
491
492    /// Deletes a range of values from a column family.
493    fn delete_range_cf<K: AsRef<[u8]>>(
494        &self,
495        cf: &rocksdb::ColumnFamily,
496        from: K,
497        to: K,
498    ) -> Result<(), rocksdb::Error> {
499        self.db_rw().delete_range_cf(cf, from, to)
500    }
501
502    /// Returns an iterator over a column family.
503    fn iterator_cf(
504        &self,
505        cf: &rocksdb::ColumnFamily,
506        mode: IteratorMode<'_>,
507    ) -> RocksDBIterEnum<'_> {
508        match self {
509            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
510            Self::Secondary { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
511        }
512    }
513
514    /// Returns a raw iterator over a column family.
515    ///
516    /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
517    /// repositioning without creating a new iterator.
518    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
519        match self {
520            Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
521            Self::Secondary { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
522        }
523    }
524
525    /// Returns a read-only, point-in-time snapshot of the database.
526    fn snapshot(&self) -> RocksReadSnapshotInner<'_> {
527        match self {
528            Self::ReadWrite { db, .. } => RocksReadSnapshotInner::ReadWrite(db.snapshot()),
529            Self::Secondary { db, .. } => RocksReadSnapshotInner::Secondary(db),
530        }
531    }
532
533    /// Returns the path to the database directory.
534    fn path(&self) -> &Path {
535        match self {
536            Self::ReadWrite { db, .. } => db.path(),
537            Self::Secondary { db, .. } => db.path(),
538        }
539    }
540
541    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
542    ///
543    /// WAL files have a `.log` extension in the `RocksDB` directory.
544    fn wal_size_bytes(&self) -> u64 {
545        let path = self.path();
546
547        match std::fs::read_dir(path) {
548            Ok(entries) => entries
549                .filter_map(|e| e.ok())
550                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
551                .filter_map(|e| e.metadata().ok())
552                .map(|m| m.len())
553                .sum(),
554            Err(_) => 0,
555        }
556    }
557
558    /// Returns statistics for all column families in the database.
559    fn table_stats(&self) -> Vec<RocksDBTableStats> {
560        let mut stats = Vec::new();
561
562        macro_rules! collect_stats {
563            ($db:expr) => {
564                for cf_name in ROCKSDB_TABLES {
565                    if let Some(cf) = $db.cf_handle(cf_name) {
566                        let estimated_num_keys = $db
567                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
568                            .ok()
569                            .flatten()
570                            .unwrap_or(0);
571
572                        // SST files size (on-disk) + memtable size (in-memory)
573                        let sst_size = $db
574                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
575                            .ok()
576                            .flatten()
577                            .unwrap_or(0);
578
579                        let memtable_size = $db
580                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
581                            .ok()
582                            .flatten()
583                            .unwrap_or(0);
584
585                        let estimated_size_bytes = sst_size + memtable_size;
586
587                        let pending_compaction_bytes = $db
588                            .property_int_value_cf(
589                                cf,
590                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
591                            )
592                            .ok()
593                            .flatten()
594                            .unwrap_or(0);
595
596                        stats.push(RocksDBTableStats {
597                            sst_size_bytes: sst_size,
598                            memtable_size_bytes: memtable_size,
599                            name: cf_name.to_string(),
600                            estimated_num_keys,
601                            estimated_size_bytes,
602                            pending_compaction_bytes,
603                        });
604                    }
605                }
606            };
607        }
608
609        match self {
610            Self::ReadWrite { db, .. } => collect_stats!(db),
611            Self::Secondary { db, .. } => collect_stats!(db),
612        }
613
614        stats
615    }
616
617    /// Returns database-level statistics including per-table stats and WAL size.
618    fn db_stats(&self) -> RocksDBStats {
619        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
620    }
621}
622
623impl fmt::Debug for RocksDBProviderInner {
624    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625        match self {
626            Self::ReadWrite { metrics, .. } => f
627                .debug_struct("RocksDBProviderInner::ReadWrite")
628                .field("db", &"<OptimisticTransactionDB>")
629                .field("metrics", metrics)
630                .finish(),
631            Self::Secondary { metrics, .. } => f
632                .debug_struct("RocksDBProviderInner::Secondary")
633                .field("db", &"<DB (secondary)>")
634                .field("metrics", metrics)
635                .finish(),
636        }
637    }
638}
639
640impl Drop for RocksDBProviderInner {
641    fn drop(&mut self) {
642        match self {
643            Self::ReadWrite { db, .. } => {
644                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
645                // restart
646                if let Err(e) = db.flush_wal(true) {
647                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
648                }
649                for cf_name in ROCKSDB_TABLES {
650                    if let Some(cf) = db.cf_handle(cf_name) &&
651                        let Err(e) = db.flush_cf(&cf)
652                    {
653                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
654                    }
655                }
656                db.cancel_all_background_work(true);
657            }
658            Self::Secondary { db, secondary_path, .. } => {
659                db.cancel_all_background_work(true);
660                let _ = std::fs::remove_dir_all(secondary_path);
661            }
662        }
663    }
664}
665
666impl Clone for RocksDBProvider {
667    fn clone(&self) -> Self {
668        Self(self.0.clone())
669    }
670}
671
672impl DatabaseMetrics for RocksDBProvider {
673    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
674        let mut metrics = Vec::new();
675
676        for stat in self.table_stats() {
677            metrics.push((
678                "rocksdb.table_size",
679                stat.estimated_size_bytes as f64,
680                vec![Label::new("table", stat.name.clone())],
681            ));
682            metrics.push((
683                "rocksdb.table_entries",
684                stat.estimated_num_keys as f64,
685                vec![Label::new("table", stat.name.clone())],
686            ));
687            metrics.push((
688                "rocksdb.pending_compaction_bytes",
689                stat.pending_compaction_bytes as f64,
690                vec![Label::new("table", stat.name.clone())],
691            ));
692            metrics.push((
693                "rocksdb.sst_size",
694                stat.sst_size_bytes as f64,
695                vec![Label::new("table", stat.name.clone())],
696            ));
697            metrics.push((
698                "rocksdb.memtable_size",
699                stat.memtable_size_bytes as f64,
700                vec![Label::new("table", stat.name)],
701            ));
702        }
703
704        // WAL size (DB-level, shared across all tables)
705        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
706
707        metrics
708    }
709}
710
711impl RocksDBProvider {
712    /// Creates a new `RocksDB` provider.
713    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
714        RocksDBBuilder::new(path).build()
715    }
716
717    /// Creates a new `RocksDB` provider builder.
718    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
719        RocksDBBuilder::new(path)
720    }
721
722    /// Returns `true` if a `RocksDB` database exists at the given path.
723    ///
724    /// Checks for the presence of the `CURRENT` file, which `RocksDB` creates
725    /// when initializing a database.
726    pub fn exists(path: impl AsRef<Path>) -> bool {
727        path.as_ref().join("CURRENT").exists()
728    }
729
730    /// Returns `true` if this provider is in read-only mode.
731    pub fn is_read_only(&self) -> bool {
732        matches!(self.0.as_ref(), RocksDBProviderInner::Secondary { .. })
733    }
734
735    /// Tries to catch up with the primary instance by reading new WAL and MANIFEST entries.
736    ///
737    /// This is a no-op for read-write and read-only providers.
738    /// For secondary providers, this incrementally syncs with the primary's latest state.
739    pub fn try_catch_up_with_primary(&self) -> ProviderResult<()> {
740        match self.0.as_ref() {
741            RocksDBProviderInner::Secondary { db, .. } => {
742                db.try_catch_up_with_primary().map_err(|e| {
743                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
744                        message: e.to_string().into(),
745                        code: -1,
746                    }))
747                })
748            }
749            _ => Ok(()),
750        }
751    }
752
753    /// Returns a read-only, point-in-time snapshot of the database.
754    ///
755    /// Lighter weight than [`RocksTx`] — no write-conflict tracking, and `Send + Sync`.
756    pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
757        RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
758    }
759
760    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
761    ///
762    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
763    /// Conflict detection happens at commit time, not at write time.
764    ///
765    /// # Panics
766    /// Panics if the provider is in read-only mode.
767    pub fn tx(&self) -> RocksTx<'_> {
768        let write_options = WriteOptions::default();
769        let txn_options = OptimisticTransactionOptions::default();
770        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
771        RocksTx { inner, provider: self }
772    }
773
774    /// Creates a new batch for atomic writes.
775    ///
776    /// Use [`Self::write_batch`] for closure-based atomic writes.
777    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
778    ///
779    /// # Panics
780    /// Panics if the provider is in read-only mode when attempting to commit.
781    pub fn batch(&self) -> RocksDBBatch<'_> {
782        RocksDBBatch {
783            provider: self,
784            inner: WriteBatchWithTransaction::<true>::default(),
785            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
786            auto_commit_threshold: None,
787        }
788    }
789
790    /// Creates a new batch with auto-commit enabled.
791    ///
792    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
793    /// committed and reset. This prevents OOM during large bulk writes while maintaining
794    /// crash-safety via the consistency check on startup.
795    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
796        RocksDBBatch {
797            provider: self,
798            inner: WriteBatchWithTransaction::<true>::default(),
799            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
800            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
801        }
802    }
803
804    /// Gets the column family handle for a table.
805    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
806        self.0.cf_handle::<T>()
807    }
808
809    /// Executes a function and records metrics with the given operation and table name.
810    fn execute_with_operation_metric<R>(
811        &self,
812        operation: RocksDBOperation,
813        table: &'static str,
814        f: impl FnOnce(&Self) -> R,
815    ) -> R {
816        let start = self.0.metrics().map(|_| Instant::now());
817        let res = f(self);
818
819        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
820            metrics.record_operation(operation, table, start.elapsed());
821        }
822
823        res
824    }
825
826    /// Gets a value from the specified table.
827    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
828        self.get_encoded::<T>(&key.encode())
829    }
830
831    /// Gets a value from the specified table using pre-encoded key.
832    pub fn get_encoded<T: Table>(
833        &self,
834        key: &<T::Key as Encode>::Encoded,
835    ) -> ProviderResult<Option<T::Value>> {
836        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
837            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
838                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
839                    message: e.to_string().into(),
840                    code: -1,
841                }))
842            })?;
843
844            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
845        })
846    }
847
848    /// Puts upsert a value into the specified table with the given key.
849    ///
850    /// # Panics
851    /// Panics if the provider is in read-only mode.
852    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
853        let encoded_key = key.encode();
854        self.put_encoded::<T>(&encoded_key, value)
855    }
856
857    /// Puts a value into the specified table using pre-encoded key.
858    ///
859    /// # Panics
860    /// Panics if the provider is in read-only mode.
861    pub fn put_encoded<T: Table>(
862        &self,
863        key: &<T::Key as Encode>::Encoded,
864        value: &T::Value,
865    ) -> ProviderResult<()> {
866        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
867            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
868            // is thread safe if user want to avoid allocate buf each time, they can use
869            // write_batch api
870            let mut buf = Vec::new();
871            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
872
873            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
874                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
875                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
876                    operation: DatabaseWriteOperation::PutUpsert,
877                    table_name: T::NAME,
878                    key: key.as_ref().to_vec(),
879                })))
880            })
881        })
882    }
883
884    /// Deletes a value from the specified table.
885    ///
886    /// # Panics
887    /// Panics if the provider is in read-only mode.
888    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
889        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
890            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
891                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
892                    message: e.to_string().into(),
893                    code: -1,
894                }))
895            })
896        })
897    }
898
899    /// Clears all entries from the specified table.
900    ///
901    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
902    /// This end key must exceed the maximum encoded key size for any table.
903    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
904    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
905        let cf = self.get_cf_handle::<T>()?;
906
907        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
908            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
909                message: e.to_string().into(),
910                code: -1,
911            }))
912        })?;
913
914        Ok(())
915    }
916
917    /// Retrieves the first or last entry from a table based on the iterator mode.
918    fn get_boundary<T: Table>(
919        &self,
920        mode: IteratorMode<'_>,
921    ) -> ProviderResult<Option<(T::Key, T::Value)>> {
922        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
923            let cf = this.get_cf_handle::<T>()?;
924            let mut iter = this.0.iterator_cf(cf, mode);
925
926            match iter.next() {
927                Some(Ok((key_bytes, value_bytes))) => {
928                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
929                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
930                    let value = T::Value::decompress(&value_bytes)
931                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
932                    Ok(Some((key, value)))
933                }
934                Some(Err(e)) => {
935                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
936                        message: e.to_string().into(),
937                        code: -1,
938                    })))
939                }
940                None => Ok(None),
941            }
942        })
943    }
944
945    /// Gets the first (smallest key) entry from the specified table.
946    #[inline]
947    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
948        self.get_boundary::<T>(IteratorMode::Start)
949    }
950
951    /// Gets the last (largest key) entry from the specified table.
952    #[inline]
953    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
954        self.get_boundary::<T>(IteratorMode::End)
955    }
956
957    /// Creates an iterator over all entries in the specified table.
958    ///
959    /// Returns decoded `(Key, Value)` pairs in key order.
960    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
961        let cf = self.get_cf_handle::<T>()?;
962        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
963        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
964    }
965
966    /// Returns statistics for all column families in the database.
967    ///
968    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
969    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
970        self.0.table_stats()
971    }
972
973    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
974    ///
975    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
976    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
977    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
978    pub fn wal_size_bytes(&self) -> u64 {
979        self.0.wal_size_bytes()
980    }
981
982    /// Returns database-level statistics including per-table stats and WAL size.
983    ///
984    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
985    pub fn db_stats(&self) -> RocksDBStats {
986        self.0.db_stats()
987    }
988
989    /// Flushes pending writes for the specified tables to disk.
990    ///
991    /// This performs a flush of:
992    /// 1. The column family memtables for the specified table names to SST files
993    /// 2. The Write-Ahead Log (WAL) with sync
994    ///
995    /// After this call completes, all data for the specified tables is durably persisted to disk.
996    ///
997    /// # Panics
998    /// Panics if the provider is in read-only mode.
999    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
1000    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
1001        let db = self.0.db_rw();
1002
1003        for cf_name in tables {
1004            if let Some(cf) = db.cf_handle(cf_name) {
1005                db.flush_cf(&cf).map_err(|e| {
1006                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1007                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1008                        operation: DatabaseWriteOperation::Flush,
1009                        table_name: cf_name,
1010                        key: Vec::new(),
1011                    })))
1012                })?;
1013            }
1014        }
1015
1016        db.flush_wal(true).map_err(|e| {
1017            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1018                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1019                operation: DatabaseWriteOperation::Flush,
1020                table_name: "WAL",
1021                key: Vec::new(),
1022            })))
1023        })?;
1024
1025        Ok(())
1026    }
1027
1028    /// Flushes and compacts all tables in `RocksDB`.
1029    ///
1030    /// This:
1031    /// 1. Flushes all column family memtables to SST files
1032    /// 2. Flushes the Write-Ahead Log (WAL) with sync
1033    /// 3. Triggers manual compaction on all column families to reclaim disk space
1034    ///
1035    /// Use this after large delete operations (like pruning) to reclaim disk space.
1036    ///
1037    /// # Panics
1038    /// Panics if the provider is in read-only mode.
1039    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1040    pub fn flush_and_compact(&self) -> ProviderResult<()> {
1041        self.flush(ROCKSDB_TABLES)?;
1042
1043        let db = self.0.db_rw();
1044
1045        for cf_name in ROCKSDB_TABLES {
1046            if let Some(cf) = db.cf_handle(cf_name) {
1047                db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
1048            }
1049        }
1050
1051        Ok(())
1052    }
1053
1054    /// Creates a raw iterator over all entries in the specified table.
1055    ///
1056    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
1057    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
1058        let cf = self.get_cf_handle::<T>()?;
1059        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
1060        Ok(RocksDBRawIter { inner: iter })
1061    }
1062
1063    /// Returns all account history shards for the given address in ascending key order.
1064    ///
1065    /// This is used for unwind operations where we need to scan all shards for an address
1066    /// and potentially delete or truncate them.
1067    pub fn account_history_shards(
1068        &self,
1069        address: Address,
1070    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
1071        // Get the column family handle for the AccountsHistory table.
1072        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
1073
1074        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
1075        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
1076        let start_key = ShardedKey::new(address, 0u64);
1077        let start_bytes = start_key.encode();
1078
1079        // Create a forward iterator starting from our seek position.
1080        let iter = self
1081            .0
1082            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1083
1084        let mut result = Vec::new();
1085        for item in iter {
1086            match item {
1087                Ok((key_bytes, value_bytes)) => {
1088                    // Decode the sharded key to check if we're still on the same address.
1089                    let key = ShardedKey::<Address>::decode(&key_bytes)
1090                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1091
1092                    // Stop when we reach a different address (keys are sorted by address first).
1093                    if key.key != address {
1094                        break;
1095                    }
1096
1097                    // Decompress the block number list stored in this shard.
1098                    let value = BlockNumberList::decompress(&value_bytes)
1099                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1100
1101                    result.push((key, value));
1102                }
1103                Err(e) => {
1104                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1105                        message: e.to_string().into(),
1106                        code: -1,
1107                    })));
1108                }
1109            }
1110        }
1111
1112        Ok(result)
1113    }
1114
1115    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1116    ///
1117    /// Iterates through all shards in ascending `highest_block_number` order until
1118    /// a different `(address, storage_key)` is encountered.
1119    pub fn storage_history_shards(
1120        &self,
1121        address: Address,
1122        storage_key: B256,
1123    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1124        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1125
1126        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1127        let start_bytes = start_key.encode();
1128
1129        let iter = self
1130            .0
1131            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1132
1133        let mut result = Vec::new();
1134        for item in iter {
1135            match item {
1136                Ok((key_bytes, value_bytes)) => {
1137                    let key = StorageShardedKey::decode(&key_bytes)
1138                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1139
1140                    if key.address != address || key.sharded_key.key != storage_key {
1141                        break;
1142                    }
1143
1144                    let value = BlockNumberList::decompress(&value_bytes)
1145                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1146
1147                    result.push((key, value));
1148                }
1149                Err(e) => {
1150                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1151                        message: e.to_string().into(),
1152                        code: -1,
1153                    })));
1154                }
1155            }
1156        }
1157
1158        Ok(result)
1159    }
1160
1161    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1162    ///
1163    /// Groups addresses by their minimum block number and calls the appropriate unwind
1164    /// operations. For each address, keeps only blocks less than the minimum block
1165    /// (i.e., removes the minimum block and all higher blocks).
1166    ///
1167    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1168    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1169    pub fn unwind_account_history_indices(
1170        &self,
1171        last_indices: &[(Address, BlockNumber)],
1172    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1173        let mut address_min_block: AddressMap<BlockNumber> =
1174            AddressMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1175        for &(address, block_number) in last_indices {
1176            address_min_block
1177                .entry(address)
1178                .and_modify(|min| *min = (*min).min(block_number))
1179                .or_insert(block_number);
1180        }
1181
1182        let mut batch = self.batch();
1183        for (address, min_block) in address_min_block {
1184            match min_block.checked_sub(1) {
1185                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1186                None => batch.clear_account_history(address)?,
1187            }
1188        }
1189
1190        Ok(batch.into_inner())
1191    }
1192
1193    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1194    ///
1195    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1196    /// For each key, keeps only blocks less than the minimum block
1197    /// (i.e., removes the minimum block and all higher blocks).
1198    ///
1199    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1200    pub fn unwind_storage_history_indices(
1201        &self,
1202        storage_changesets: &[(Address, B256, BlockNumber)],
1203    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1204        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1205            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1206        for &(address, storage_key, block_number) in storage_changesets {
1207            key_min_block
1208                .entry((address, storage_key))
1209                .and_modify(|min| *min = (*min).min(block_number))
1210                .or_insert(block_number);
1211        }
1212
1213        let mut batch = self.batch();
1214        for ((address, storage_key), min_block) in key_min_block {
1215            match min_block.checked_sub(1) {
1216                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1217                None => batch.clear_storage_history(address, storage_key)?,
1218            }
1219        }
1220
1221        Ok(batch.into_inner())
1222    }
1223
1224    /// Writes a batch of operations atomically.
1225    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1226    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1227    where
1228        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1229    {
1230        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1231            let mut batch_handle = this.batch();
1232            f(&mut batch_handle)?;
1233            batch_handle.commit()
1234        })
1235    }
1236
1237    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1238    ///
1239    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1240    /// and needs to be committed at a later point (e.g., at provider commit time).
1241    ///
1242    /// # Panics
1243    /// Panics if the provider is in read-only mode.
1244    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1245    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1246        self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1247            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1248                message: e.to_string().into(),
1249                code: -1,
1250            }))
1251        })
1252    }
1253
1254    /// Writes all `RocksDB` data for multiple blocks in parallel.
1255    ///
1256    /// This handles transaction hash numbers, account history, and storage history based on
1257    /// the provided storage settings. Each operation runs in parallel with its own batch,
1258    /// pushing to `ctx.pending_batches` for later commit.
1259    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1260    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1261        &self,
1262        blocks: &[ExecutedBlock<N>],
1263        tx_nums: &[TxNumber],
1264        ctx: RocksDBWriteCtx,
1265        runtime: &reth_tasks::Runtime,
1266    ) -> ProviderResult<()> {
1267        if !ctx.storage_settings.storage_v2 {
1268            return Ok(());
1269        }
1270
1271        let mut r_tx_hash = None;
1272        let mut r_account_history = None;
1273        let mut r_storage_history = None;
1274
1275        let write_tx_hash =
1276            ctx.storage_settings.storage_v2 && ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
1277        let write_account_history = ctx.storage_settings.storage_v2;
1278        let write_storage_history = ctx.storage_settings.storage_v2;
1279
1280        // Propagate tracing context into rayon-spawned threads so that RocksDB
1281        // write spans appear as children of write_blocks_data in traces.
1282        let span = tracing::Span::current();
1283        runtime.storage_pool().in_place_scope(|s| {
1284            if write_tx_hash {
1285                s.spawn(|_| {
1286                    let _guard = span.enter();
1287                    r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
1288                });
1289            }
1290
1291            if write_account_history {
1292                s.spawn(|_| {
1293                    let _guard = span.enter();
1294                    r_account_history = Some(self.write_account_history(blocks, &ctx));
1295                });
1296            }
1297
1298            if write_storage_history {
1299                s.spawn(|_| {
1300                    let _guard = span.enter();
1301                    r_storage_history = Some(self.write_storage_history(blocks, &ctx));
1302                });
1303            }
1304        });
1305
1306        if write_tx_hash {
1307            r_tx_hash.ok_or_else(|| {
1308                ProviderError::Database(DatabaseError::Other(
1309                    "rocksdb tx-hash write thread panicked".into(),
1310                ))
1311            })??;
1312        }
1313        if write_account_history {
1314            r_account_history.ok_or_else(|| {
1315                ProviderError::Database(DatabaseError::Other(
1316                    "rocksdb account-history write thread panicked".into(),
1317                ))
1318            })??;
1319        }
1320        if write_storage_history {
1321            r_storage_history.ok_or_else(|| {
1322                ProviderError::Database(DatabaseError::Other(
1323                    "rocksdb storage-history write thread panicked".into(),
1324                ))
1325            })??;
1326        }
1327
1328        Ok(())
1329    }
1330
1331    /// Writes transaction hash to number mappings for the given blocks.
1332    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1333    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1334        &self,
1335        blocks: &[ExecutedBlock<N>],
1336        tx_nums: &[TxNumber],
1337        ctx: &RocksDBWriteCtx,
1338    ) -> ProviderResult<()> {
1339        let mut batch = self.batch();
1340        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1341            let body = block.recovered_block().body();
1342            for (tx_num, transaction) in (first_tx_num..).zip(body.transactions_iter()) {
1343                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1344            }
1345        }
1346        ctx.pending_batches.lock().push(batch.into_inner());
1347        Ok(())
1348    }
1349
1350    /// Writes account history indices for the given blocks.
1351    ///
1352    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1353    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1354    fn write_account_history<N: reth_node_types::NodePrimitives>(
1355        &self,
1356        blocks: &[ExecutedBlock<N>],
1357        ctx: &RocksDBWriteCtx,
1358    ) -> ProviderResult<()> {
1359        let mut batch = self.batch();
1360        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1361
1362        for (block_idx, block) in blocks.iter().enumerate() {
1363            let block_number = ctx.first_block_number + block_idx as u64;
1364            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1365
1366            // Iterate through account reverts - these are exactly the accounts that have
1367            // changesets written, ensuring history indices match changeset entries.
1368            for account_block_reverts in reverts.accounts {
1369                for (address, _) in account_block_reverts {
1370                    account_history.entry(address).or_default().push(block_number);
1371                }
1372            }
1373        }
1374
1375        // Write account history using proper shard append logic
1376        for (address, indices) in account_history {
1377            batch.append_account_history_shard(address, indices)?;
1378        }
1379        ctx.pending_batches.lock().push(batch.into_inner());
1380        Ok(())
1381    }
1382
1383    /// Writes storage history indices for the given blocks.
1384    ///
1385    /// Derives history indices from reverts (same source as changesets) to ensure consistency.
1386    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1387    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1388        &self,
1389        blocks: &[ExecutedBlock<N>],
1390        ctx: &RocksDBWriteCtx,
1391    ) -> ProviderResult<()> {
1392        let mut batch = self.batch();
1393        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1394
1395        for (block_idx, block) in blocks.iter().enumerate() {
1396            let block_number = ctx.first_block_number + block_idx as u64;
1397            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
1398
1399            // Iterate through storage reverts - these are exactly the slots that have
1400            // changesets written, ensuring history indices match changeset entries.
1401            for storage_block_reverts in reverts.storage {
1402                for revert in storage_block_reverts {
1403                    for (slot, _) in revert.storage_revert {
1404                        let plain_key = B256::new(slot.to_be_bytes());
1405                        storage_history
1406                            .entry((revert.address, plain_key))
1407                            .or_default()
1408                            .push(block_number);
1409                    }
1410                }
1411            }
1412        }
1413
1414        // Write storage history using proper shard append logic
1415        for ((address, slot), indices) in storage_history {
1416            batch.append_storage_history_shard(address, slot, indices)?;
1417        }
1418        ctx.pending_batches.lock().push(batch.into_inner());
1419        Ok(())
1420    }
1421}
1422
1423/// A point-in-time read snapshot of the `RocksDB` database.
1424///
1425/// All reads through this snapshot see a consistent view of the database at the point
1426/// the snapshot was created, regardless of concurrent writes. This is the primary reader
1427/// used by [`EitherReader::RocksDB`](crate::either_writer::EitherReader) for history lookups.
1428///
1429/// Lighter weight than [`RocksTx`] — no transaction overhead, no write support.
1430pub struct RocksReadSnapshot<'db> {
1431    inner: RocksReadSnapshotInner<'db>,
1432    provider: &'db RocksDBProvider,
1433}
1434
1435/// Inner enum to hold the snapshot for either read-write or secondary mode.
1436enum RocksReadSnapshotInner<'db> {
1437    /// Snapshot from read-write `OptimisticTransactionDB`.
1438    ReadWrite(SnapshotWithThreadMode<'db, OptimisticTransactionDB>),
1439    /// Direct reads from a secondary `DB` instance (no snapshot).
1440    Secondary(&'db DB),
1441}
1442
1443impl<'db> RocksReadSnapshotInner<'db> {
1444    /// Returns a raw iterator over a column family.
1445    fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
1446        match self {
1447            Self::ReadWrite(snap) => RocksDBRawIterEnum::ReadWrite(snap.raw_iterator_cf(cf)),
1448            Self::Secondary(db) => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
1449        }
1450    }
1451}
1452
1453impl fmt::Debug for RocksReadSnapshot<'_> {
1454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1455        f.debug_struct("RocksReadSnapshot")
1456            .field("provider", &self.provider)
1457            .finish_non_exhaustive()
1458    }
1459}
1460
1461impl<'db> RocksReadSnapshot<'db> {
1462    /// Gets the column family handle for a table.
1463    fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
1464        self.provider.get_cf_handle::<T>()
1465    }
1466
1467    /// Gets a value from the specified table.
1468    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1469        let encoded_key = key.encode();
1470        let cf = self.cf_handle::<T>()?;
1471        let result = match &self.inner {
1472            RocksReadSnapshotInner::ReadWrite(snap) => snap.get_cf(cf, encoded_key.as_ref()),
1473            RocksReadSnapshotInner::Secondary(db) => db.get_cf(cf, encoded_key.as_ref()),
1474        }
1475        .map_err(|e| {
1476            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1477                message: e.to_string().into(),
1478                code: -1,
1479            }))
1480        })?;
1481
1482        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1483    }
1484
1485    /// Lookup account history and return [`HistoryInfo`] directly.
1486    ///
1487    /// `visible_tip` is the highest block considered visible from the companion MDBX snapshot.
1488    /// History entries above it are ignored even if they already exist in `RocksDB`.
1489    pub fn account_history_info(
1490        &self,
1491        address: Address,
1492        block_number: BlockNumber,
1493        lowest_available_block_number: Option<BlockNumber>,
1494        visible_tip: BlockNumber,
1495    ) -> ProviderResult<HistoryInfo> {
1496        let key = ShardedKey::new(address, block_number);
1497        self.history_info::<tables::AccountsHistory>(
1498            key.encode().as_ref(),
1499            block_number,
1500            lowest_available_block_number,
1501            visible_tip,
1502            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1503            |prev_bytes| {
1504                <ShardedKey<Address> as Decode>::decode(prev_bytes)
1505                    .map(|k| k.key == address)
1506                    .unwrap_or(false)
1507            },
1508        )
1509    }
1510
1511    /// Lookup storage history and return [`HistoryInfo`] directly.
1512    ///
1513    /// `visible_tip` is the highest block considered visible from the companion MDBX snapshot.
1514    /// History entries above it are ignored even if they already exist in `RocksDB`.
1515    pub fn storage_history_info(
1516        &self,
1517        address: Address,
1518        storage_key: B256,
1519        block_number: BlockNumber,
1520        lowest_available_block_number: Option<BlockNumber>,
1521        visible_tip: BlockNumber,
1522    ) -> ProviderResult<HistoryInfo> {
1523        let key = StorageShardedKey::new(address, storage_key, block_number);
1524        self.history_info::<tables::StoragesHistory>(
1525            key.encode().as_ref(),
1526            block_number,
1527            lowest_available_block_number,
1528            visible_tip,
1529            |key_bytes| {
1530                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1531                Ok(k.address == address && k.sharded_key.key == storage_key)
1532            },
1533            |prev_bytes| {
1534                <StorageShardedKey as Decode>::decode(prev_bytes)
1535                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
1536                    .unwrap_or(false)
1537            },
1538        )
1539    }
1540
1541    /// Generic history lookup using the snapshot's raw iterator.
1542    ///
1543    /// The result is derived from the history that is visible through `visible_tip`, not from the
1544    /// full contents of `RocksDB`. This lets a reader combine an older MDBX snapshot with a newer
1545    /// Rocks snapshot without routing through history entries that MDBX cannot see yet.
1546    fn history_info<T>(
1547        &self,
1548        encoded_key: &[u8],
1549        block_number: BlockNumber,
1550        lowest_available_block_number: Option<BlockNumber>,
1551        visible_tip: BlockNumber,
1552        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1553        prev_key_matches: impl Fn(&[u8]) -> bool,
1554    ) -> ProviderResult<HistoryInfo>
1555    where
1556        T: Table<Value = BlockNumberList>,
1557    {
1558        let is_maybe_pruned = lowest_available_block_number.is_some();
1559        let fallback = || {
1560            Ok(if is_maybe_pruned {
1561                HistoryInfo::MaybeInPlainState
1562            } else {
1563                HistoryInfo::NotYetWritten
1564            })
1565        };
1566
1567        let cf = self.cf_handle::<T>()?;
1568        let mut iter = self.inner.raw_iterator_cf(cf);
1569
1570        iter.seek(encoded_key);
1571        iter.status().map_err(|e| {
1572            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1573                message: e.to_string().into(),
1574                code: -1,
1575            }))
1576        })?;
1577
1578        if !iter.valid() {
1579            return fallback();
1580        }
1581
1582        let Some(key_bytes) = iter.key() else {
1583            return fallback();
1584        };
1585        if !key_matches(key_bytes)? {
1586            return fallback();
1587        }
1588
1589        let Some(value_bytes) = iter.value() else {
1590            return fallback();
1591        };
1592        let chunk = BlockNumberList::decompress(value_bytes)?;
1593
1594        let (rank, found_block) = compute_history_rank(&chunk, block_number);
1595        // Ignore later Rocks history that is ahead of the companion MDBX snapshot.
1596        let found_block = found_block.filter(|block| *block <= visible_tip);
1597
1598        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1599            iter.prev();
1600            iter.status().map_err(|e| {
1601                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1602                    message: e.to_string().into(),
1603                    code: -1,
1604                }))
1605            })?;
1606            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1607
1608            // If the current shard only contains history above `visible_tip`, there is no usable
1609            // later change. Without a previous shard for the same key, fall back to the existing
1610            // not-written / maybe-pruned result instead of routing into plain state.
1611            if found_block.is_none() && !has_prev {
1612                return fallback()
1613            }
1614
1615            !has_prev
1616        } else {
1617            false
1618        };
1619
1620        Ok(HistoryInfo::from_lookup(
1621            found_block,
1622            is_before_first_write,
1623            lowest_available_block_number,
1624        ))
1625    }
1626}
1627
1628/// Outcome of pruning a history shard in `RocksDB`.
1629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1630pub enum PruneShardOutcome {
1631    /// Shard was deleted entirely.
1632    Deleted,
1633    /// Shard was updated with filtered block numbers.
1634    Updated,
1635    /// Shard was unchanged (no blocks <= `to_block`).
1636    Unchanged,
1637}
1638
1639/// Tracks pruning outcomes for batch operations.
1640#[derive(Debug, Default, Clone, Copy)]
1641pub struct PrunedIndices {
1642    /// Number of shards completely deleted.
1643    pub deleted: usize,
1644    /// Number of shards that were updated (filtered but still have entries).
1645    pub updated: usize,
1646    /// Number of shards that were unchanged.
1647    pub unchanged: usize,
1648}
1649
1650/// Handle for building a batch of operations atomically.
1651///
1652/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1653/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1654/// where you don't need to read back uncommitted data within the same operation
1655/// (e.g., history index writes).
1656///
1657/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1658/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1659#[must_use = "batch must be committed"]
1660pub struct RocksDBBatch<'a> {
1661    provider: &'a RocksDBProvider,
1662    inner: WriteBatchWithTransaction<true>,
1663    buf: Vec<u8>,
1664    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1665    auto_commit_threshold: Option<usize>,
1666}
1667
1668impl fmt::Debug for RocksDBBatch<'_> {
1669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1670        f.debug_struct("RocksDBBatch")
1671            .field("provider", &self.provider)
1672            .field("batch", &"<WriteBatchWithTransaction>")
1673            // Number of operations in this batch
1674            .field("length", &self.inner.len())
1675            // Total serialized size (encoded key + compressed value + metadata) of this batch
1676            // in bytes
1677            .field("size_in_bytes", &self.inner.size_in_bytes())
1678            .finish()
1679    }
1680}
1681
1682impl<'a> RocksDBBatch<'a> {
1683    /// Puts a value into the batch.
1684    ///
1685    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1686    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1687        let encoded_key = key.encode();
1688        self.put_encoded::<T>(&encoded_key, value)
1689    }
1690
1691    /// Puts a value into the batch using pre-encoded key.
1692    ///
1693    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1694    pub fn put_encoded<T: Table>(
1695        &mut self,
1696        key: &<T::Key as Encode>::Encoded,
1697        value: &T::Value,
1698    ) -> ProviderResult<()> {
1699        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1700        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1701        self.maybe_auto_commit()?;
1702        Ok(())
1703    }
1704
1705    /// Deletes a value from the batch.
1706    ///
1707    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1708    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1709        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1710        self.maybe_auto_commit()?;
1711        Ok(())
1712    }
1713
1714    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1715    ///
1716    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1717    /// Returns immediately if auto-commit is disabled or threshold not reached.
1718    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1719        if let Some(threshold) = self.auto_commit_threshold &&
1720            self.inner.size_in_bytes() >= threshold
1721        {
1722            tracing::debug!(
1723                target: "providers::rocksdb",
1724                batch_size = self.inner.size_in_bytes(),
1725                threshold,
1726                "Auto-committing RocksDB batch"
1727            );
1728            let old_batch = std::mem::take(&mut self.inner);
1729            self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1730                |e| {
1731                    ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1732                        message: e.to_string().into(),
1733                        code: -1,
1734                    }))
1735                },
1736            )?;
1737        }
1738        Ok(())
1739    }
1740
1741    /// Commits the batch to the database.
1742    ///
1743    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1744    ///
1745    /// # Panics
1746    /// Panics if the provider is in read-only mode.
1747    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1748    pub fn commit(self) -> ProviderResult<()> {
1749        self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1750            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1751                message: e.to_string().into(),
1752                code: -1,
1753            }))
1754        })
1755    }
1756
1757    /// Returns the number of write operations (puts + deletes) queued in this batch.
1758    pub fn len(&self) -> usize {
1759        self.inner.len()
1760    }
1761
1762    /// Returns `true` if the batch contains no operations.
1763    pub fn is_empty(&self) -> bool {
1764        self.inner.is_empty()
1765    }
1766
1767    /// Returns the size of the batch in bytes.
1768    pub fn size_in_bytes(&self) -> usize {
1769        self.inner.size_in_bytes()
1770    }
1771
1772    /// Returns a reference to the underlying `RocksDB` provider.
1773    pub const fn provider(&self) -> &RocksDBProvider {
1774        self.provider
1775    }
1776
1777    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1778    ///
1779    /// This is used to defer commits to the provider level.
1780    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1781        self.inner
1782    }
1783
1784    /// Gets a value from the database.
1785    ///
1786    /// **Important constraint:** This reads only committed state, not pending writes in this
1787    /// batch or other pending batches in `pending_rocksdb_batches`.
1788    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1789        self.provider.get::<T>(key)
1790    }
1791
1792    /// Appends indices to an account history shard with proper shard management.
1793    ///
1794    /// Loads the existing shard (if any), appends new indices, and rechunks into
1795    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1796    ///
1797    /// # Requirements
1798    ///
1799    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1800    /// - This method MUST only be called once per address per batch. The batch reads existing
1801    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1802    ///   address will cause the second call to overwrite the first.
1803    pub fn append_account_history_shard(
1804        &mut self,
1805        address: Address,
1806        indices: impl IntoIterator<Item = u64>,
1807    ) -> ProviderResult<()> {
1808        let indices: Vec<u64> = indices.into_iter().collect();
1809
1810        if indices.is_empty() {
1811            return Ok(());
1812        }
1813
1814        debug_assert!(
1815            indices.windows(2).all(|w| w[0] < w[1]),
1816            "indices must be strictly increasing: {:?}",
1817            indices
1818        );
1819
1820        let last_key = ShardedKey::new(address, u64::MAX);
1821        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1822        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1823
1824        last_shard.append(indices).map_err(ProviderError::other)?;
1825
1826        // Fast path: all indices fit in one shard
1827        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1828            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1829            return Ok(());
1830        }
1831
1832        // Slow path: rechunk into multiple shards
1833        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1834        let mut chunks_peekable = chunks.into_iter().peekable();
1835
1836        while let Some(chunk) = chunks_peekable.next() {
1837            let shard = BlockNumberList::new_pre_sorted(chunk);
1838            let highest_block_number = if chunks_peekable.peek().is_some() {
1839                shard.iter().next_back().expect("`chunks` does not return empty list")
1840            } else {
1841                u64::MAX
1842            };
1843
1844            self.put::<tables::AccountsHistory>(
1845                ShardedKey::new(address, highest_block_number),
1846                &shard,
1847            )?;
1848        }
1849
1850        Ok(())
1851    }
1852
1853    /// Appends indices to a storage history shard with proper shard management.
1854    ///
1855    /// Loads the existing shard (if any), appends new indices, and rechunks into
1856    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1857    ///
1858    /// # Requirements
1859    ///
1860    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1861    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1862    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1863    ///   twice for the same key will cause the second call to overwrite the first.
1864    pub fn append_storage_history_shard(
1865        &mut self,
1866        address: Address,
1867        storage_key: B256,
1868        indices: impl IntoIterator<Item = u64>,
1869    ) -> ProviderResult<()> {
1870        let indices: Vec<u64> = indices.into_iter().collect();
1871
1872        if indices.is_empty() {
1873            return Ok(());
1874        }
1875
1876        debug_assert!(
1877            indices.windows(2).all(|w| w[0] < w[1]),
1878            "indices must be strictly increasing: {:?}",
1879            indices
1880        );
1881
1882        let last_key = StorageShardedKey::last(address, storage_key);
1883        let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1884        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1885
1886        last_shard.append(indices).map_err(ProviderError::other)?;
1887
1888        // Fast path: all indices fit in one shard
1889        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1890            self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1891            return Ok(());
1892        }
1893
1894        // Slow path: rechunk into multiple shards
1895        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1896        let mut chunks_peekable = chunks.into_iter().peekable();
1897
1898        while let Some(chunk) = chunks_peekable.next() {
1899            let shard = BlockNumberList::new_pre_sorted(chunk);
1900            let highest_block_number = if chunks_peekable.peek().is_some() {
1901                shard.iter().next_back().expect("`chunks` does not return empty list")
1902            } else {
1903                u64::MAX
1904            };
1905
1906            self.put::<tables::StoragesHistory>(
1907                StorageShardedKey::new(address, storage_key, highest_block_number),
1908                &shard,
1909            )?;
1910        }
1911
1912        Ok(())
1913    }
1914
1915    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1916    ///
1917    /// Mirrors MDBX `unwind_history_shards` behavior:
1918    /// - Deletes shards entirely above `keep_to`
1919    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1920    /// - Preserves shards entirely below `keep_to`
1921    pub fn unwind_account_history_to(
1922        &mut self,
1923        address: Address,
1924        keep_to: BlockNumber,
1925    ) -> ProviderResult<()> {
1926        let shards = self.provider.account_history_shards(address)?;
1927        if shards.is_empty() {
1928            return Ok(());
1929        }
1930
1931        // Find the first shard that might contain blocks > keep_to.
1932        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1933        let boundary_idx = shards.iter().position(|(key, _)| {
1934            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1935        });
1936
1937        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1938        let Some(boundary_idx) = boundary_idx else {
1939            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1940            if last_key.highest_block_number != u64::MAX {
1941                self.delete::<tables::AccountsHistory>(last_key.clone())?;
1942                self.put::<tables::AccountsHistory>(
1943                    ShardedKey::new(address, u64::MAX),
1944                    last_value,
1945                )?;
1946            }
1947            return Ok(());
1948        };
1949
1950        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1951        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1952            self.delete::<tables::AccountsHistory>(key.clone())?;
1953        }
1954
1955        // Process the boundary shard: filter out blocks > keep_to
1956        let (boundary_key, boundary_list) = &shards[boundary_idx];
1957
1958        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1959        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1960
1961        // Build truncated list once; check emptiness directly (avoids double iteration)
1962        let new_last =
1963            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1964
1965        if new_last.is_empty() {
1966            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
1967            // u64::MAX.
1968            if boundary_idx == 0 {
1969                // Nothing left for this address
1970                return Ok(());
1971            }
1972
1973            let (prev_key, prev_value) = &shards[boundary_idx - 1];
1974            if prev_key.highest_block_number != u64::MAX {
1975                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1976                self.put::<tables::AccountsHistory>(
1977                    ShardedKey::new(address, u64::MAX),
1978                    prev_value,
1979                )?;
1980            }
1981            return Ok(());
1982        }
1983
1984        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1985
1986        Ok(())
1987    }
1988
1989    /// Prunes history shards, removing blocks <= `to_block`.
1990    ///
1991    /// Generic implementation for both account and storage history pruning.
1992    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
1993    /// (if any) will have the sentinel key (`u64::MAX`).
1994    #[expect(clippy::too_many_arguments)]
1995    fn prune_history_shards_inner<K>(
1996        &mut self,
1997        shards: Vec<(K, BlockNumberList)>,
1998        to_block: BlockNumber,
1999        get_highest: impl Fn(&K) -> u64,
2000        is_sentinel: impl Fn(&K) -> bool,
2001        delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
2002        put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
2003        create_sentinel: impl Fn() -> K,
2004    ) -> ProviderResult<PruneShardOutcome>
2005    where
2006        K: Clone,
2007    {
2008        if shards.is_empty() {
2009            return Ok(PruneShardOutcome::Unchanged);
2010        }
2011
2012        let mut deleted = false;
2013        let mut updated = false;
2014        let mut last_remaining: Option<(K, BlockNumberList)> = None;
2015
2016        for (key, block_list) in shards {
2017            if !is_sentinel(&key) && get_highest(&key) <= to_block {
2018                delete_shard(self, key)?;
2019                deleted = true;
2020            } else {
2021                let original_len = block_list.len();
2022                let filtered =
2023                    BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
2024
2025                if filtered.is_empty() {
2026                    delete_shard(self, key)?;
2027                    deleted = true;
2028                } else if filtered.len() < original_len {
2029                    put_shard(self, key.clone(), &filtered)?;
2030                    last_remaining = Some((key, filtered));
2031                    updated = true;
2032                } else {
2033                    last_remaining = Some((key, block_list));
2034                }
2035            }
2036        }
2037
2038        if let Some((last_key, last_value)) = last_remaining &&
2039            !is_sentinel(&last_key)
2040        {
2041            delete_shard(self, last_key)?;
2042            put_shard(self, create_sentinel(), &last_value)?;
2043            updated = true;
2044        }
2045
2046        if deleted {
2047            Ok(PruneShardOutcome::Deleted)
2048        } else if updated {
2049            Ok(PruneShardOutcome::Updated)
2050        } else {
2051            Ok(PruneShardOutcome::Unchanged)
2052        }
2053    }
2054
2055    /// Prunes account history for the given address, removing blocks <= `to_block`.
2056    ///
2057    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2058    /// (if any) will have the sentinel key (`u64::MAX`).
2059    pub fn prune_account_history_to(
2060        &mut self,
2061        address: Address,
2062        to_block: BlockNumber,
2063    ) -> ProviderResult<PruneShardOutcome> {
2064        let shards = self.provider.account_history_shards(address)?;
2065        self.prune_history_shards_inner(
2066            shards,
2067            to_block,
2068            |key| key.highest_block_number,
2069            |key| key.highest_block_number == u64::MAX,
2070            |batch, key| batch.delete::<tables::AccountsHistory>(key),
2071            |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2072            || ShardedKey::new(address, u64::MAX),
2073        )
2074    }
2075
2076    /// Prunes account history for multiple addresses in a single iterator pass.
2077    ///
2078    /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
2079    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2080    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2081    ///
2082    /// `targets` MUST be sorted by address for correctness and optimal performance
2083    /// (matches on-disk key order).
2084    pub fn prune_account_history_batch(
2085        &mut self,
2086        targets: &[(Address, BlockNumber)],
2087    ) -> ProviderResult<PrunedIndices> {
2088        if targets.is_empty() {
2089            return Ok(PrunedIndices::default());
2090        }
2091
2092        debug_assert!(
2093            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2094            "prune_account_history_batch: targets must be sorted by address"
2095        );
2096
2097        // ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
2098        // The first 20 bytes are the "prefix" that identifies the address
2099        const PREFIX_LEN: usize = 20;
2100
2101        let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
2102        let mut iter = self.provider.0.raw_iterator_cf(cf);
2103        let mut outcomes = PrunedIndices::default();
2104
2105        for (address, to_block) in targets {
2106            // Build the target prefix (first 20 bytes = address)
2107            let start_key = ShardedKey::new(*address, 0u64).encode();
2108            let target_prefix = &start_key[..PREFIX_LEN];
2109
2110            // Check if we need to seek or if the iterator is already positioned correctly.
2111            // After processing the previous target, the iterator is either:
2112            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2113            // 2. Invalid (no more keys)
2114            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2115            let needs_seek = if iter.valid() {
2116                if let Some(current_key) = iter.key() {
2117                    // If current key's prefix < target prefix, we need to seek forward
2118                    // If current key's prefix > target prefix, this target has no shards (skip)
2119                    // If current key's prefix == target prefix, we're already positioned
2120                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2121                } else {
2122                    true
2123                }
2124            } else {
2125                true
2126            };
2127
2128            if needs_seek {
2129                iter.seek(start_key);
2130                iter.status().map_err(|e| {
2131                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2132                        message: e.to_string().into(),
2133                        code: -1,
2134                    }))
2135                })?;
2136            }
2137
2138            // Collect all shards for this address using raw prefix comparison
2139            let mut shards = Vec::new();
2140            while iter.valid() {
2141                let Some(key_bytes) = iter.key() else { break };
2142
2143                // Use raw prefix comparison instead of full decode for the prefix check
2144                let current_prefix = key_bytes.get(..PREFIX_LEN);
2145                if current_prefix != Some(target_prefix) {
2146                    break;
2147                }
2148
2149                // Now decode the full key (we need the block number)
2150                let key = ShardedKey::<Address>::decode(key_bytes)
2151                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2152
2153                let Some(value_bytes) = iter.value() else { break };
2154                let value = BlockNumberList::decompress(value_bytes)
2155                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2156
2157                shards.push((key, value));
2158                iter.next();
2159            }
2160
2161            match self.prune_history_shards_inner(
2162                shards,
2163                *to_block,
2164                |key| key.highest_block_number,
2165                |key| key.highest_block_number == u64::MAX,
2166                |batch, key| batch.delete::<tables::AccountsHistory>(key),
2167                |batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
2168                || ShardedKey::new(*address, u64::MAX),
2169            )? {
2170                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2171                PruneShardOutcome::Updated => outcomes.updated += 1,
2172                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2173            }
2174        }
2175
2176        Ok(outcomes)
2177    }
2178
2179    /// Prunes storage history for the given address and storage key, removing blocks <=
2180    /// `to_block`.
2181    ///
2182    /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
2183    /// (if any) will have the sentinel key (`u64::MAX`).
2184    pub fn prune_storage_history_to(
2185        &mut self,
2186        address: Address,
2187        storage_key: B256,
2188        to_block: BlockNumber,
2189    ) -> ProviderResult<PruneShardOutcome> {
2190        let shards = self.provider.storage_history_shards(address, storage_key)?;
2191        self.prune_history_shards_inner(
2192            shards,
2193            to_block,
2194            |key| key.sharded_key.highest_block_number,
2195            |key| key.sharded_key.highest_block_number == u64::MAX,
2196            |batch, key| batch.delete::<tables::StoragesHistory>(key),
2197            |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2198            || StorageShardedKey::last(address, storage_key),
2199        )
2200    }
2201
2202    /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
2203    /// pass.
2204    ///
2205    /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
2206    /// because it reuses a single raw iterator and skips seeks when the iterator is already
2207    /// positioned correctly (which happens when targets are sorted and adjacent in key order).
2208    ///
2209    /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
2210    /// performance (matches on-disk key order).
2211    pub fn prune_storage_history_batch(
2212        &mut self,
2213        targets: &[((Address, B256), BlockNumber)],
2214    ) -> ProviderResult<PrunedIndices> {
2215        if targets.is_empty() {
2216            return Ok(PrunedIndices::default());
2217        }
2218
2219        debug_assert!(
2220            targets.windows(2).all(|w| w[0].0 <= w[1].0),
2221            "prune_storage_history_batch: targets must be sorted by (address, storage_key)"
2222        );
2223
2224        // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
2225        // The first 52 bytes are the "prefix" that identifies (address, storage_key)
2226        const PREFIX_LEN: usize = 52;
2227
2228        let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
2229        let mut iter = self.provider.0.raw_iterator_cf(cf);
2230        let mut outcomes = PrunedIndices::default();
2231
2232        for ((address, storage_key), to_block) in targets {
2233            // Build the target prefix (first 52 bytes of encoded key)
2234            let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
2235            let target_prefix = &start_key[..PREFIX_LEN];
2236
2237            // Check if we need to seek or if the iterator is already positioned correctly.
2238            // After processing the previous target, the iterator is either:
2239            // 1. Positioned at a key with a different prefix (we iterated past our shards)
2240            // 2. Invalid (no more keys)
2241            // If the current key's prefix >= our target prefix, we may be able to skip the seek.
2242            let needs_seek = if iter.valid() {
2243                if let Some(current_key) = iter.key() {
2244                    // If current key's prefix < target prefix, we need to seek forward
2245                    // If current key's prefix > target prefix, this target has no shards (skip)
2246                    // If current key's prefix == target prefix, we're already positioned
2247                    current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
2248                } else {
2249                    true
2250                }
2251            } else {
2252                true
2253            };
2254
2255            if needs_seek {
2256                iter.seek(start_key);
2257                iter.status().map_err(|e| {
2258                    ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2259                        message: e.to_string().into(),
2260                        code: -1,
2261                    }))
2262                })?;
2263            }
2264
2265            // Collect all shards for this (address, storage_key) pair using prefix comparison
2266            let mut shards = Vec::new();
2267            while iter.valid() {
2268                let Some(key_bytes) = iter.key() else { break };
2269
2270                // Use raw prefix comparison instead of full decode for the prefix check
2271                let current_prefix = key_bytes.get(..PREFIX_LEN);
2272                if current_prefix != Some(target_prefix) {
2273                    break;
2274                }
2275
2276                // Now decode the full key (we need the block number)
2277                let key = StorageShardedKey::decode(key_bytes)
2278                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2279
2280                let Some(value_bytes) = iter.value() else { break };
2281                let value = BlockNumberList::decompress(value_bytes)
2282                    .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2283
2284                shards.push((key, value));
2285                iter.next();
2286            }
2287
2288            // Use existing prune_history_shards_inner logic
2289            match self.prune_history_shards_inner(
2290                shards,
2291                *to_block,
2292                |key| key.sharded_key.highest_block_number,
2293                |key| key.sharded_key.highest_block_number == u64::MAX,
2294                |batch, key| batch.delete::<tables::StoragesHistory>(key),
2295                |batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
2296                || StorageShardedKey::last(*address, *storage_key),
2297            )? {
2298                PruneShardOutcome::Deleted => outcomes.deleted += 1,
2299                PruneShardOutcome::Updated => outcomes.updated += 1,
2300                PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
2301            }
2302        }
2303
2304        Ok(outcomes)
2305    }
2306
2307    /// Unwinds storage history to keep only blocks `<= keep_to`.
2308    ///
2309    /// Handles multi-shard scenarios by:
2310    /// 1. Loading all shards for the `(address, storage_key)` pair
2311    /// 2. Finding the boundary shard containing `keep_to`
2312    /// 3. Deleting all shards after the boundary
2313    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
2314    /// 5. Ensuring the last shard is keyed with `u64::MAX`
2315    pub fn unwind_storage_history_to(
2316        &mut self,
2317        address: Address,
2318        storage_key: B256,
2319        keep_to: BlockNumber,
2320    ) -> ProviderResult<()> {
2321        let shards = self.provider.storage_history_shards(address, storage_key)?;
2322        if shards.is_empty() {
2323            return Ok(());
2324        }
2325
2326        // Find the first shard that might contain blocks > keep_to.
2327        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
2328        let boundary_idx = shards.iter().position(|(key, _)| {
2329            key.sharded_key.highest_block_number == u64::MAX ||
2330                key.sharded_key.highest_block_number > keep_to
2331        });
2332
2333        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
2334        let Some(boundary_idx) = boundary_idx else {
2335            let (last_key, last_value) = shards.last().expect("shards is non-empty");
2336            if last_key.sharded_key.highest_block_number != u64::MAX {
2337                self.delete::<tables::StoragesHistory>(last_key.clone())?;
2338                self.put::<tables::StoragesHistory>(
2339                    StorageShardedKey::last(address, storage_key),
2340                    last_value,
2341                )?;
2342            }
2343            return Ok(());
2344        };
2345
2346        // Delete all shards strictly after the boundary (they are entirely > keep_to)
2347        for (key, _) in shards.iter().skip(boundary_idx + 1) {
2348            self.delete::<tables::StoragesHistory>(key.clone())?;
2349        }
2350
2351        // Process the boundary shard: filter out blocks > keep_to
2352        let (boundary_key, boundary_list) = &shards[boundary_idx];
2353
2354        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
2355        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
2356
2357        // Build truncated list once; check emptiness directly (avoids double iteration)
2358        let new_last =
2359            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
2360
2361        if new_last.is_empty() {
2362            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
2363            // u64::MAX.
2364            if boundary_idx == 0 {
2365                // Nothing left for this (address, storage_key) pair
2366                return Ok(());
2367            }
2368
2369            let (prev_key, prev_value) = &shards[boundary_idx - 1];
2370            if prev_key.sharded_key.highest_block_number != u64::MAX {
2371                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
2372                self.put::<tables::StoragesHistory>(
2373                    StorageShardedKey::last(address, storage_key),
2374                    prev_value,
2375                )?;
2376            }
2377            return Ok(());
2378        }
2379
2380        self.put::<tables::StoragesHistory>(
2381            StorageShardedKey::last(address, storage_key),
2382            &new_last,
2383        )?;
2384
2385        Ok(())
2386    }
2387
2388    /// Clears all account history shards for the given address.
2389    ///
2390    /// Used when unwinding from block 0 (i.e., removing all history).
2391    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
2392        let shards = self.provider.account_history_shards(address)?;
2393        for (key, _) in shards {
2394            self.delete::<tables::AccountsHistory>(key)?;
2395        }
2396        Ok(())
2397    }
2398
2399    /// Clears all storage history shards for the given `(address, storage_key)` pair.
2400    ///
2401    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
2402    pub fn clear_storage_history(
2403        &mut self,
2404        address: Address,
2405        storage_key: B256,
2406    ) -> ProviderResult<()> {
2407        let shards = self.provider.storage_history_shards(address, storage_key)?;
2408        for (key, _) in shards {
2409            self.delete::<tables::StoragesHistory>(key)?;
2410        }
2411        Ok(())
2412    }
2413}
2414
2415/// `RocksDB` transaction wrapper providing MDBX-like semantics.
2416///
2417/// Supports:
2418/// - Read-your-writes: reads see uncommitted writes within the same transaction
2419/// - Atomic commit/rollback
2420/// - Iteration over uncommitted data
2421///
2422/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
2423/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
2424pub struct RocksTx<'db> {
2425    inner: Transaction<'db, OptimisticTransactionDB>,
2426    provider: &'db RocksDBProvider,
2427}
2428
2429impl fmt::Debug for RocksTx<'_> {
2430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2431        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
2432    }
2433}
2434
2435impl<'db> RocksTx<'db> {
2436    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
2437    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
2438        let encoded_key = key.encode();
2439        self.get_encoded::<T>(&encoded_key)
2440    }
2441
2442    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
2443    pub fn get_encoded<T: Table>(
2444        &self,
2445        key: &<T::Key as Encode>::Encoded,
2446    ) -> ProviderResult<Option<T::Value>> {
2447        let cf = self.provider.get_cf_handle::<T>()?;
2448        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
2449            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2450                message: e.to_string().into(),
2451                code: -1,
2452            }))
2453        })?;
2454
2455        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
2456    }
2457
2458    /// Puts a value into the specified table.
2459    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
2460        let encoded_key = key.encode();
2461        self.put_encoded::<T>(&encoded_key, value)
2462    }
2463
2464    /// Puts a value using pre-encoded key.
2465    pub fn put_encoded<T: Table>(
2466        &self,
2467        key: &<T::Key as Encode>::Encoded,
2468        value: &T::Value,
2469    ) -> ProviderResult<()> {
2470        let cf = self.provider.get_cf_handle::<T>()?;
2471        let mut buf = Vec::new();
2472        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
2473
2474        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
2475            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
2476                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
2477                operation: DatabaseWriteOperation::PutUpsert,
2478                table_name: T::NAME,
2479                key: key.as_ref().to_vec(),
2480            })))
2481        })
2482    }
2483
2484    /// Deletes a value from the specified table.
2485    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
2486        let cf = self.provider.get_cf_handle::<T>()?;
2487        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
2488            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
2489                message: e.to_string().into(),
2490                code: -1,
2491            }))
2492        })
2493    }
2494
2495    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
2496    ///
2497    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
2498    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
2499        let cf = self.provider.get_cf_handle::<T>()?;
2500        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
2501        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2502    }
2503
2504    /// Creates an iterator starting from the given key (inclusive).
2505    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
2506        let cf = self.provider.get_cf_handle::<T>()?;
2507        let encoded_key = key.encode();
2508        let iter = self
2509            .inner
2510            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
2511        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
2512    }
2513
2514    /// Commits the transaction, persisting all changes.
2515    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2516    pub fn commit(self) -> ProviderResult<()> {
2517        self.inner.commit().map_err(|e| {
2518            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
2519                message: e.to_string().into(),
2520                code: -1,
2521            }))
2522        })
2523    }
2524
2525    /// Rolls back the transaction, discarding all changes.
2526    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
2527    pub fn rollback(self) -> ProviderResult<()> {
2528        self.inner.rollback().map_err(|e| {
2529            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
2530        })
2531    }
2532}
2533
2534/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
2535enum RocksDBIterEnum<'db> {
2536    /// Iterator from read-write `OptimisticTransactionDB`.
2537    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2538    /// Iterator from read-only `DB`.
2539    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
2540}
2541
2542impl Iterator for RocksDBIterEnum<'_> {
2543    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
2544
2545    fn next(&mut self) -> Option<Self::Item> {
2546        match self {
2547            Self::ReadWrite(iter) => iter.next(),
2548            Self::ReadOnly(iter) => iter.next(),
2549        }
2550    }
2551}
2552
2553/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
2554///
2555/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
2556/// without reinitializing the iterator.
2557enum RocksDBRawIterEnum<'db> {
2558    /// Raw iterator from read-write `OptimisticTransactionDB`.
2559    ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
2560    /// Raw iterator from read-only `DB`.
2561    ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
2562}
2563
2564impl RocksDBRawIterEnum<'_> {
2565    /// Positions the iterator at the first key >= `key`.
2566    fn seek(&mut self, key: impl AsRef<[u8]>) {
2567        match self {
2568            Self::ReadWrite(iter) => iter.seek(key),
2569            Self::ReadOnly(iter) => iter.seek(key),
2570        }
2571    }
2572
2573    /// Returns true if the iterator is positioned at a valid key-value pair.
2574    fn valid(&self) -> bool {
2575        match self {
2576            Self::ReadWrite(iter) => iter.valid(),
2577            Self::ReadOnly(iter) => iter.valid(),
2578        }
2579    }
2580
2581    /// Returns the current key, if valid.
2582    fn key(&self) -> Option<&[u8]> {
2583        match self {
2584            Self::ReadWrite(iter) => iter.key(),
2585            Self::ReadOnly(iter) => iter.key(),
2586        }
2587    }
2588
2589    /// Returns the current value, if valid.
2590    fn value(&self) -> Option<&[u8]> {
2591        match self {
2592            Self::ReadWrite(iter) => iter.value(),
2593            Self::ReadOnly(iter) => iter.value(),
2594        }
2595    }
2596
2597    /// Advances the iterator to the next key.
2598    fn next(&mut self) {
2599        match self {
2600            Self::ReadWrite(iter) => iter.next(),
2601            Self::ReadOnly(iter) => iter.next(),
2602        }
2603    }
2604
2605    /// Moves the iterator to the previous key.
2606    fn prev(&mut self) {
2607        match self {
2608            Self::ReadWrite(iter) => iter.prev(),
2609            Self::ReadOnly(iter) => iter.prev(),
2610        }
2611    }
2612
2613    /// Returns the status of the iterator.
2614    fn status(&self) -> Result<(), rocksdb::Error> {
2615        match self {
2616            Self::ReadWrite(iter) => iter.status(),
2617            Self::ReadOnly(iter) => iter.status(),
2618        }
2619    }
2620}
2621
2622/// Iterator over a `RocksDB` table (non-transactional).
2623///
2624/// Yields decoded `(Key, Value)` pairs in key order.
2625pub struct RocksDBIter<'db, T: Table> {
2626    inner: RocksDBIterEnum<'db>,
2627    _marker: std::marker::PhantomData<T>,
2628}
2629
2630impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2631    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2632        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2633    }
2634}
2635
2636impl<T: Table> Iterator for RocksDBIter<'_, T> {
2637    type Item = ProviderResult<(T::Key, T::Value)>;
2638
2639    fn next(&mut self) -> Option<Self::Item> {
2640        Some(decode_iter_item::<T>(self.inner.next()?))
2641    }
2642}
2643
2644/// Raw iterator over a `RocksDB` table (non-transactional).
2645///
2646/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2647pub struct RocksDBRawIter<'db> {
2648    inner: RocksDBIterEnum<'db>,
2649}
2650
2651impl fmt::Debug for RocksDBRawIter<'_> {
2652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2653        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2654    }
2655}
2656
2657impl Iterator for RocksDBRawIter<'_> {
2658    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2659
2660    fn next(&mut self) -> Option<Self::Item> {
2661        match self.inner.next()? {
2662            Ok(kv) => Some(Ok(kv)),
2663            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2664                message: e.to_string().into(),
2665                code: -1,
2666            })))),
2667        }
2668    }
2669}
2670
2671/// Iterator over a `RocksDB` table within a transaction.
2672///
2673/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2674pub struct RocksTxIter<'tx, T: Table> {
2675    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2676    _marker: std::marker::PhantomData<T>,
2677}
2678
2679impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2680    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2681        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2682    }
2683}
2684
2685impl<T: Table> Iterator for RocksTxIter<'_, T> {
2686    type Item = ProviderResult<(T::Key, T::Value)>;
2687
2688    fn next(&mut self) -> Option<Self::Item> {
2689        Some(decode_iter_item::<T>(self.inner.next()?))
2690    }
2691}
2692
2693/// Decodes a raw key-value pair from a `RocksDB` iterator into typed table entries.
2694///
2695/// Handles both error propagation from the underlying iterator and
2696/// decoding/decompression of the key and value bytes.
2697fn decode_iter_item<T: Table>(result: RawKVResult) -> ProviderResult<(T::Key, T::Value)> {
2698    let (key_bytes, value_bytes) = result.map_err(|e| {
2699        ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2700            message: e.to_string().into(),
2701            code: -1,
2702        }))
2703    })?;
2704
2705    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
2706        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2707
2708    let value = T::Value::decompress(&value_bytes)
2709        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
2710
2711    Ok((key, value))
2712}
2713
2714/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2715const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2716    match level {
2717        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2718        LogLevel::Error => rocksdb::LogLevel::Error,
2719        LogLevel::Warn => rocksdb::LogLevel::Warn,
2720        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2721        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2722    }
2723}
2724
2725#[cfg(test)]
2726mod tests {
2727    use super::*;
2728    use crate::providers::HistoryInfo;
2729    use alloy_primitives::{Address, TxHash, B256};
2730    use reth_db_api::{
2731        models::{
2732            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2733            storage_sharded_key::StorageShardedKey,
2734            IntegerList,
2735        },
2736        table::Table,
2737        tables,
2738    };
2739    use tempfile::TempDir;
2740
2741    #[test]
2742    fn test_with_default_tables_registers_required_column_families() {
2743        let temp_dir = TempDir::new().unwrap();
2744
2745        // Build with default tables
2746        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2747
2748        // Should be able to write/read TransactionHashNumbers
2749        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2750        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2751        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2752
2753        // Should be able to write/read AccountsHistory
2754        let key = ShardedKey::new(Address::ZERO, 100);
2755        let value = IntegerList::default();
2756        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2757        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2758
2759        // Should be able to write/read StoragesHistory
2760        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2761        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2762        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2763    }
2764
2765    #[derive(Debug)]
2766    struct TestTable;
2767
2768    impl Table for TestTable {
2769        const NAME: &'static str = "TestTable";
2770        const DUPSORT: bool = false;
2771        type Key = u64;
2772        type Value = Vec<u8>;
2773    }
2774
2775    #[test]
2776    fn test_basic_operations() {
2777        let temp_dir = TempDir::new().unwrap();
2778
2779        let provider = RocksDBBuilder::new(temp_dir.path())
2780            .with_table::<TestTable>() // Type-safe!
2781            .build()
2782            .unwrap();
2783
2784        let key = 42u64;
2785        let value = b"test_value".to_vec();
2786
2787        // Test write
2788        provider.put::<TestTable>(key, &value).unwrap();
2789
2790        // Test read
2791        let result = provider.get::<TestTable>(key).unwrap();
2792        assert_eq!(result, Some(value));
2793
2794        // Test delete
2795        provider.delete::<TestTable>(key).unwrap();
2796
2797        // Verify deletion
2798        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2799    }
2800
2801    #[test]
2802    fn test_batch_operations() {
2803        let temp_dir = TempDir::new().unwrap();
2804        let provider =
2805            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2806
2807        // Write multiple entries in a batch
2808        provider
2809            .write_batch(|batch| {
2810                for i in 0..10u64 {
2811                    let value = format!("value_{i}").into_bytes();
2812                    batch.put::<TestTable>(i, &value)?;
2813                }
2814                Ok(())
2815            })
2816            .unwrap();
2817
2818        // Read all entries
2819        for i in 0..10u64 {
2820            let value = format!("value_{i}").into_bytes();
2821            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2822        }
2823
2824        // Delete all entries in a batch
2825        provider
2826            .write_batch(|batch| {
2827                for i in 0..10u64 {
2828                    batch.delete::<TestTable>(i)?;
2829                }
2830                Ok(())
2831            })
2832            .unwrap();
2833
2834        // Verify all deleted
2835        for i in 0..10u64 {
2836            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2837        }
2838    }
2839
2840    #[test]
2841    fn test_with_real_table() {
2842        let temp_dir = TempDir::new().unwrap();
2843        let provider = RocksDBBuilder::new(temp_dir.path())
2844            .with_table::<tables::TransactionHashNumbers>()
2845            .with_metrics()
2846            .build()
2847            .unwrap();
2848
2849        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2850
2851        // Insert and retrieve
2852        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2853        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2854
2855        // Batch insert multiple transactions
2856        provider
2857            .write_batch(|batch| {
2858                for i in 0..10u64 {
2859                    let hash = TxHash::from(B256::from([i as u8; 32]));
2860                    let value = i * 100;
2861                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2862                }
2863                Ok(())
2864            })
2865            .unwrap();
2866
2867        // Verify batch insertions
2868        for i in 0..10u64 {
2869            let hash = TxHash::from(B256::from([i as u8; 32]));
2870            assert_eq!(
2871                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2872                Some(i * 100)
2873            );
2874        }
2875    }
2876    #[test]
2877    fn test_statistics_enabled() {
2878        let temp_dir = TempDir::new().unwrap();
2879        // Just verify that building with statistics doesn't panic
2880        let provider = RocksDBBuilder::new(temp_dir.path())
2881            .with_table::<TestTable>()
2882            .with_statistics()
2883            .build()
2884            .unwrap();
2885
2886        // Do operations - data should be immediately readable with OptimisticTransactionDB
2887        for i in 0..10 {
2888            let value = vec![i as u8];
2889            provider.put::<TestTable>(i, &value).unwrap();
2890            // Verify write is visible
2891            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2892        }
2893    }
2894
2895    #[test]
2896    fn test_data_persistence() {
2897        let temp_dir = TempDir::new().unwrap();
2898        let provider =
2899            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2900
2901        // Insert data - OptimisticTransactionDB writes are immediately visible
2902        let value = vec![42u8; 1000];
2903        for i in 0..100 {
2904            provider.put::<TestTable>(i, &value).unwrap();
2905        }
2906
2907        // Verify data is readable
2908        for i in 0..100 {
2909            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2910        }
2911    }
2912
2913    #[test]
2914    fn test_transaction_read_your_writes() {
2915        let temp_dir = TempDir::new().unwrap();
2916        let provider =
2917            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2918
2919        // Create a transaction
2920        let tx = provider.tx();
2921
2922        // Write data within the transaction
2923        let key = 42u64;
2924        let value = b"test_value".to_vec();
2925        tx.put::<TestTable>(key, &value).unwrap();
2926
2927        // Read-your-writes: should see uncommitted data in same transaction
2928        let result = tx.get::<TestTable>(key).unwrap();
2929        assert_eq!(
2930            result,
2931            Some(value.clone()),
2932            "Transaction should see its own uncommitted writes"
2933        );
2934
2935        // Data should NOT be visible via provider (outside transaction)
2936        let provider_result = provider.get::<TestTable>(key).unwrap();
2937        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2938
2939        // Commit the transaction
2940        tx.commit().unwrap();
2941
2942        // Now data should be visible via provider
2943        let committed_result = provider.get::<TestTable>(key).unwrap();
2944        assert_eq!(committed_result, Some(value), "Committed data should be visible");
2945    }
2946
2947    #[test]
2948    fn test_transaction_rollback() {
2949        let temp_dir = TempDir::new().unwrap();
2950        let provider =
2951            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2952
2953        // First, put some initial data
2954        let key = 100u64;
2955        let initial_value = b"initial".to_vec();
2956        provider.put::<TestTable>(key, &initial_value).unwrap();
2957
2958        // Create a transaction and modify data
2959        let tx = provider.tx();
2960        let new_value = b"modified".to_vec();
2961        tx.put::<TestTable>(key, &new_value).unwrap();
2962
2963        // Verify modification is visible within transaction
2964        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2965
2966        // Rollback instead of commit
2967        tx.rollback().unwrap();
2968
2969        // Data should be unchanged (initial value)
2970        let result = provider.get::<TestTable>(key).unwrap();
2971        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2972    }
2973
2974    #[test]
2975    fn test_transaction_iterator() {
2976        let temp_dir = TempDir::new().unwrap();
2977        let provider =
2978            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2979
2980        // Create a transaction
2981        let tx = provider.tx();
2982
2983        // Write multiple entries
2984        for i in 0..5u64 {
2985            let value = format!("value_{i}").into_bytes();
2986            tx.put::<TestTable>(i, &value).unwrap();
2987        }
2988
2989        // Iterate - should see uncommitted writes
2990        let mut count = 0;
2991        for result in tx.iter::<TestTable>().unwrap() {
2992            let (key, value) = result.unwrap();
2993            assert_eq!(value, format!("value_{key}").into_bytes());
2994            count += 1;
2995        }
2996        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2997
2998        // Commit
2999        tx.commit().unwrap();
3000    }
3001
3002    #[test]
3003    fn test_batch_manual_commit() {
3004        let temp_dir = TempDir::new().unwrap();
3005        let provider =
3006            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3007
3008        // Create a batch via provider.batch()
3009        let mut batch = provider.batch();
3010
3011        // Add entries
3012        for i in 0..10u64 {
3013            let value = format!("batch_value_{i}").into_bytes();
3014            batch.put::<TestTable>(i, &value).unwrap();
3015        }
3016
3017        // Verify len/is_empty
3018        assert_eq!(batch.len(), 10);
3019        assert!(!batch.is_empty());
3020
3021        // Data should NOT be visible before commit
3022        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
3023
3024        // Commit the batch
3025        batch.commit().unwrap();
3026
3027        // Now data should be visible
3028        for i in 0..10u64 {
3029            let value = format!("batch_value_{i}").into_bytes();
3030            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3031        }
3032    }
3033
3034    #[test]
3035    fn test_first_and_last_entry() {
3036        let temp_dir = TempDir::new().unwrap();
3037        let provider =
3038            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3039
3040        // Empty table should return None for both
3041        assert_eq!(provider.first::<TestTable>().unwrap(), None);
3042        assert_eq!(provider.last::<TestTable>().unwrap(), None);
3043
3044        // Insert some entries
3045        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
3046        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
3047        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
3048
3049        // First should return the smallest key
3050        let first = provider.first::<TestTable>().unwrap();
3051        assert_eq!(first, Some((5, b"value_5".to_vec())));
3052
3053        // Last should return the largest key
3054        let last = provider.last::<TestTable>().unwrap();
3055        assert_eq!(last, Some((20, b"value_20".to_vec())));
3056    }
3057
3058    /// Tests the edge case where block < `lowest_available_block_number`.
3059    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
3060    /// so we keep this RocksDB-specific test to verify the low-level behavior.
3061    #[test]
3062    fn test_account_history_info_pruned_before_first_entry() {
3063        let temp_dir = TempDir::new().unwrap();
3064        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3065
3066        let address = Address::from([0x42; 20]);
3067
3068        // Create a single shard starting at block 100
3069        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3070        let shard_key = ShardedKey::new(address, u64::MAX);
3071        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3072
3073        // Query for block 50 with lowest_available_block_number = 100
3074        // This simulates a pruned state where data before block 100 is not available.
3075        // Since we're before the first write AND pruning boundary is set, we need to
3076        // check the changeset at the first write block.
3077        let result =
3078            provider.snapshot().account_history_info(address, 50, Some(100), u64::MAX).unwrap();
3079        assert_eq!(result, HistoryInfo::InChangeset(100));
3080    }
3081
3082    /// Verifies that a read-only (secondary) provider can catch up with primary writes.
3083    #[test]
3084    fn test_account_history_info_read_only_and_catch_up() {
3085        let temp_dir = TempDir::new().unwrap();
3086        let address = Address::from([0x42; 20]);
3087        let chunk = IntegerList::new([100, 200, 300]).unwrap();
3088        let shard_key = ShardedKey::new(address, u64::MAX);
3089
3090        // Write data with a read-write provider
3091        let rw_provider =
3092            RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3093        rw_provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
3094
3095        // Open read-only provider — it sees the initial data.
3096        let ro_provider = RocksDBBuilder::new(temp_dir.path())
3097            .with_default_tables()
3098            .with_read_only(true)
3099            .build()
3100            .unwrap();
3101
3102        let result =
3103            ro_provider.snapshot().account_history_info(address, 200, None, u64::MAX).unwrap();
3104        assert_eq!(result, HistoryInfo::InChangeset(200));
3105
3106        let result =
3107            ro_provider.snapshot().account_history_info(address, 50, None, u64::MAX).unwrap();
3108        assert_eq!(result, HistoryInfo::NotYetWritten);
3109
3110        let result =
3111            ro_provider.snapshot().account_history_info(address, 400, None, u64::MAX).unwrap();
3112        assert_eq!(result, HistoryInfo::InPlainState);
3113
3114        // Write new data via the primary.
3115        let address2 = Address::from([0x43; 20]);
3116        let chunk2 = IntegerList::new([500, 600]).unwrap();
3117        let shard_key2 = ShardedKey::new(address2, u64::MAX);
3118        rw_provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
3119
3120        // Read-only doesn't see the new data yet.
3121        let result =
3122            ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3123        assert_eq!(result, HistoryInfo::NotYetWritten);
3124
3125        // Catch up — now it sees the new data.
3126        ro_provider.try_catch_up_with_primary().unwrap();
3127
3128        let result =
3129            ro_provider.snapshot().account_history_info(address2, 500, None, u64::MAX).unwrap();
3130        assert_eq!(result, HistoryInfo::InChangeset(500));
3131    }
3132
3133    #[test]
3134    fn test_account_history_info_ignores_blocks_above_visible_tip() {
3135        let temp_dir = TempDir::new().unwrap();
3136        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3137
3138        let address = Address::from([0x42; 20]);
3139
3140        provider
3141            .put::<tables::AccountsHistory>(
3142                ShardedKey::new(address, 110),
3143                &IntegerList::new([100, 110]).unwrap(),
3144            )
3145            .unwrap();
3146        provider
3147            .put::<tables::AccountsHistory>(
3148                ShardedKey::new(address, u64::MAX),
3149                &IntegerList::new([200, 210]).unwrap(),
3150            )
3151            .unwrap();
3152
3153        let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3154        assert_eq!(result, HistoryInfo::InPlainState);
3155    }
3156
3157    #[test]
3158    fn test_account_history_info_mixed_shard_respects_visible_tip() {
3159        let temp_dir = TempDir::new().unwrap();
3160        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3161
3162        let address = Address::from([0x42; 20]);
3163        provider
3164            .put::<tables::AccountsHistory>(
3165                ShardedKey::new(address, u64::MAX),
3166                &IntegerList::new([100, 150, 300]).unwrap(),
3167            )
3168            .unwrap();
3169
3170        let result = provider.snapshot().account_history_info(address, 120, None, 200).unwrap();
3171        assert_eq!(result, HistoryInfo::InChangeset(150));
3172
3173        let result = provider.snapshot().account_history_info(address, 201, None, 200).unwrap();
3174        assert_eq!(result, HistoryInfo::InPlainState);
3175    }
3176
3177    #[test]
3178    fn test_account_history_info_only_stale_entries_use_fallback() {
3179        let temp_dir = TempDir::new().unwrap();
3180        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3181
3182        let address = Address::from([0x42; 20]);
3183        provider
3184            .put::<tables::AccountsHistory>(
3185                ShardedKey::new(address, u64::MAX),
3186                &IntegerList::new([200, 210]).unwrap(),
3187            )
3188            .unwrap();
3189
3190        let result = provider.snapshot().account_history_info(address, 150, None, 150).unwrap();
3191        assert_eq!(result, HistoryInfo::NotYetWritten);
3192
3193        let result =
3194            provider.snapshot().account_history_info(address, 150, Some(100), 150).unwrap();
3195        assert_eq!(result, HistoryInfo::MaybeInPlainState);
3196    }
3197
3198    #[test]
3199    fn test_account_history_shard_split_at_boundary() {
3200        let temp_dir = TempDir::new().unwrap();
3201        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3202
3203        let address = Address::from([0x42; 20]);
3204        let limit = NUM_OF_INDICES_IN_SHARD;
3205
3206        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3207        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3208        let mut batch = provider.batch();
3209        batch.append_account_history_shard(address, indices).unwrap();
3210        batch.commit().unwrap();
3211
3212        // Should have 2 shards: one completed shard and one sentinel shard
3213        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
3214        let sentinel_key = ShardedKey::new(address, u64::MAX);
3215
3216        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
3217        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
3218
3219        assert!(completed_shard.is_some(), "completed shard should exist");
3220        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3221
3222        let completed_shard = completed_shard.unwrap();
3223        let sentinel_shard = sentinel_shard.unwrap();
3224
3225        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3226        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3227    }
3228
3229    #[test]
3230    fn test_account_history_multiple_shard_splits() {
3231        let temp_dir = TempDir::new().unwrap();
3232        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3233
3234        let address = Address::from([0x43; 20]);
3235        let limit = NUM_OF_INDICES_IN_SHARD;
3236
3237        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3238        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3239        let mut batch = provider.batch();
3240        batch.append_account_history_shard(address, first_batch_indices).unwrap();
3241        batch.commit().unwrap();
3242
3243        // Should have just a sentinel shard (exactly at limit, not over)
3244        let sentinel_key = ShardedKey::new(address, u64::MAX);
3245        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
3246        assert!(shard.is_some());
3247        assert_eq!(shard.unwrap().len(), limit as u64);
3248
3249        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3250        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3251        let mut batch = provider.batch();
3252        batch.append_account_history_shard(address, second_batch_indices).unwrap();
3253        batch.commit().unwrap();
3254
3255        // Now we should have: 2 completed shards + 1 sentinel shard
3256        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
3257        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
3258
3259        assert!(
3260            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
3261            "first completed shard should exist"
3262        );
3263        assert!(
3264            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
3265            "second completed shard should exist"
3266        );
3267        assert!(
3268            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
3269            "sentinel shard should exist"
3270        );
3271    }
3272
3273    #[test]
3274    fn test_storage_history_shard_split_at_boundary() {
3275        let temp_dir = TempDir::new().unwrap();
3276        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3277
3278        let address = Address::from([0x44; 20]);
3279        let slot = B256::from([0x55; 32]);
3280        let limit = NUM_OF_INDICES_IN_SHARD;
3281
3282        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
3283        let indices: Vec<u64> = (0..=(limit as u64)).collect();
3284        let mut batch = provider.batch();
3285        batch.append_storage_history_shard(address, slot, indices).unwrap();
3286        batch.commit().unwrap();
3287
3288        // Should have 2 shards: one completed shard and one sentinel shard
3289        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3290        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3291
3292        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
3293        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
3294
3295        assert!(completed_shard.is_some(), "completed shard should exist");
3296        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
3297
3298        let completed_shard = completed_shard.unwrap();
3299        let sentinel_shard = sentinel_shard.unwrap();
3300
3301        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
3302        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
3303    }
3304
3305    #[test]
3306    fn test_storage_history_multiple_shard_splits() {
3307        let temp_dir = TempDir::new().unwrap();
3308        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3309
3310        let address = Address::from([0x46; 20]);
3311        let slot = B256::from([0x57; 32]);
3312        let limit = NUM_OF_INDICES_IN_SHARD;
3313
3314        // First batch: add NUM_OF_INDICES_IN_SHARD indices
3315        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
3316        let mut batch = provider.batch();
3317        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
3318        batch.commit().unwrap();
3319
3320        // Should have just a sentinel shard (exactly at limit, not over)
3321        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
3322        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
3323        assert!(shard.is_some());
3324        assert_eq!(shard.unwrap().len(), limit as u64);
3325
3326        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
3327        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
3328        let mut batch = provider.batch();
3329        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
3330        batch.commit().unwrap();
3331
3332        // Now we should have: 2 completed shards + 1 sentinel shard
3333        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
3334        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
3335
3336        assert!(
3337            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
3338            "first completed shard should exist"
3339        );
3340        assert!(
3341            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
3342            "second completed shard should exist"
3343        );
3344        assert!(
3345            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
3346            "sentinel shard should exist"
3347        );
3348    }
3349
3350    #[test]
3351    fn test_clear_table() {
3352        let temp_dir = TempDir::new().unwrap();
3353        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3354
3355        let address = Address::from([0x42; 20]);
3356        let key = ShardedKey::new(address, u64::MAX);
3357        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
3358
3359        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
3360        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
3361
3362        provider.clear::<tables::AccountsHistory>().unwrap();
3363
3364        assert!(
3365            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
3366            "table should be empty after clear"
3367        );
3368        assert!(
3369            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
3370            "first() should return None after clear"
3371        );
3372    }
3373
3374    #[test]
3375    fn test_clear_empty_table() {
3376        let temp_dir = TempDir::new().unwrap();
3377        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3378
3379        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3380
3381        provider.clear::<tables::AccountsHistory>().unwrap();
3382
3383        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
3384    }
3385
3386    #[test]
3387    fn test_unwind_account_history_to_basic() {
3388        let temp_dir = TempDir::new().unwrap();
3389        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3390
3391        let address = Address::from([0x42; 20]);
3392
3393        // Add blocks 0-10
3394        let mut batch = provider.batch();
3395        batch.append_account_history_shard(address, 0..=10).unwrap();
3396        batch.commit().unwrap();
3397
3398        // Verify we have blocks 0-10
3399        let key = ShardedKey::new(address, u64::MAX);
3400        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
3401        assert!(result.is_some());
3402        let blocks: Vec<u64> = result.unwrap().iter().collect();
3403        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
3404
3405        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
3406        let mut batch = provider.batch();
3407        batch.unwind_account_history_to(address, 5).unwrap();
3408        batch.commit().unwrap();
3409
3410        // Verify only blocks 0-5 remain
3411        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3412        assert!(result.is_some());
3413        let blocks: Vec<u64> = result.unwrap().iter().collect();
3414        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3415    }
3416
3417    #[test]
3418    fn test_unwind_account_history_to_removes_all() {
3419        let temp_dir = TempDir::new().unwrap();
3420        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3421
3422        let address = Address::from([0x42; 20]);
3423
3424        // Add blocks 5-10
3425        let mut batch = provider.batch();
3426        batch.append_account_history_shard(address, 5..=10).unwrap();
3427        batch.commit().unwrap();
3428
3429        // Unwind to block 4 (removes all blocks since they're all > 4)
3430        let mut batch = provider.batch();
3431        batch.unwind_account_history_to(address, 4).unwrap();
3432        batch.commit().unwrap();
3433
3434        // Verify no data remains for this address
3435        let key = ShardedKey::new(address, u64::MAX);
3436        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3437        assert!(result.is_none(), "Should have no data after full unwind");
3438    }
3439
3440    #[test]
3441    fn test_unwind_account_history_to_no_op() {
3442        let temp_dir = TempDir::new().unwrap();
3443        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3444
3445        let address = Address::from([0x42; 20]);
3446
3447        // Add blocks 0-5
3448        let mut batch = provider.batch();
3449        batch.append_account_history_shard(address, 0..=5).unwrap();
3450        batch.commit().unwrap();
3451
3452        // Unwind to block 10 (no-op since all blocks are <= 10)
3453        let mut batch = provider.batch();
3454        batch.unwind_account_history_to(address, 10).unwrap();
3455        batch.commit().unwrap();
3456
3457        // Verify blocks 0-5 still remain
3458        let key = ShardedKey::new(address, u64::MAX);
3459        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3460        assert!(result.is_some());
3461        let blocks: Vec<u64> = result.unwrap().iter().collect();
3462        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
3463    }
3464
3465    #[test]
3466    fn test_unwind_account_history_to_block_zero() {
3467        let temp_dir = TempDir::new().unwrap();
3468        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3469
3470        let address = Address::from([0x42; 20]);
3471
3472        // Add blocks 0-5 (including block 0)
3473        let mut batch = provider.batch();
3474        batch.append_account_history_shard(address, 0..=5).unwrap();
3475        batch.commit().unwrap();
3476
3477        // Unwind to block 0 (keep only block 0, remove 1-5)
3478        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
3479        let mut batch = provider.batch();
3480        batch.unwind_account_history_to(address, 0).unwrap();
3481        batch.commit().unwrap();
3482
3483        // Verify only block 0 remains
3484        let key = ShardedKey::new(address, u64::MAX);
3485        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
3486        assert!(result.is_some());
3487        let blocks: Vec<u64> = result.unwrap().iter().collect();
3488        assert_eq!(blocks, vec![0]);
3489    }
3490
3491    #[test]
3492    fn test_unwind_account_history_to_multi_shard() {
3493        let temp_dir = TempDir::new().unwrap();
3494        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3495
3496        let address = Address::from([0x42; 20]);
3497
3498        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
3499        // For testing, we'll manually create shards with specific keys
3500        let mut batch = provider.batch();
3501
3502        // First shard: blocks 1-50, keyed by 50
3503        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3504        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3505
3506        // Second shard: blocks 51-100, keyed by MAX (sentinel)
3507        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3508        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3509
3510        batch.commit().unwrap();
3511
3512        // Verify we have 2 shards
3513        let shards = provider.account_history_shards(address).unwrap();
3514        assert_eq!(shards.len(), 2);
3515
3516        // Unwind to block 75 (keep 1-75, remove 76-100)
3517        let mut batch = provider.batch();
3518        batch.unwind_account_history_to(address, 75).unwrap();
3519        batch.commit().unwrap();
3520
3521        // Verify: shard1 should be untouched, shard2 should be truncated
3522        let shards = provider.account_history_shards(address).unwrap();
3523        assert_eq!(shards.len(), 2);
3524
3525        // First shard unchanged
3526        assert_eq!(shards[0].0.highest_block_number, 50);
3527        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3528
3529        // Second shard truncated and re-keyed to MAX
3530        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3531        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3532    }
3533
3534    #[test]
3535    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
3536        let temp_dir = TempDir::new().unwrap();
3537        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3538
3539        let address = Address::from([0x42; 20]);
3540
3541        // Create two shards
3542        let mut batch = provider.batch();
3543
3544        // First shard: blocks 1-50, keyed by 50
3545        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3546        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3547
3548        // Second shard: blocks 75-100, keyed by MAX
3549        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
3550        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
3551
3552        batch.commit().unwrap();
3553
3554        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
3555        let mut batch = provider.batch();
3556        batch.unwind_account_history_to(address, 60).unwrap();
3557        batch.commit().unwrap();
3558
3559        // Verify: only shard1 remains, now keyed as MAX
3560        let shards = provider.account_history_shards(address).unwrap();
3561        assert_eq!(shards.len(), 1);
3562        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
3563        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3564    }
3565
3566    #[test]
3567    fn test_account_history_shards_iterator() {
3568        let temp_dir = TempDir::new().unwrap();
3569        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3570
3571        let address = Address::from([0x42; 20]);
3572        let other_address = Address::from([0x43; 20]);
3573
3574        // Add data for two addresses
3575        let mut batch = provider.batch();
3576        batch.append_account_history_shard(address, 0..=5).unwrap();
3577        batch.append_account_history_shard(other_address, 10..=15).unwrap();
3578        batch.commit().unwrap();
3579
3580        // Query shards for first address only
3581        let shards = provider.account_history_shards(address).unwrap();
3582        assert_eq!(shards.len(), 1);
3583        assert_eq!(shards[0].0.key, address);
3584
3585        // Query shards for second address only
3586        let shards = provider.account_history_shards(other_address).unwrap();
3587        assert_eq!(shards.len(), 1);
3588        assert_eq!(shards[0].0.key, other_address);
3589
3590        // Query shards for non-existent address
3591        let non_existent = Address::from([0x99; 20]);
3592        let shards = provider.account_history_shards(non_existent).unwrap();
3593        assert!(shards.is_empty());
3594    }
3595
3596    #[test]
3597    fn test_clear_account_history() {
3598        let temp_dir = TempDir::new().unwrap();
3599        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3600
3601        let address = Address::from([0x42; 20]);
3602
3603        // Add blocks 0-10
3604        let mut batch = provider.batch();
3605        batch.append_account_history_shard(address, 0..=10).unwrap();
3606        batch.commit().unwrap();
3607
3608        // Clear all history (simulates unwind from block 0)
3609        let mut batch = provider.batch();
3610        batch.clear_account_history(address).unwrap();
3611        batch.commit().unwrap();
3612
3613        // Verify no data remains
3614        let shards = provider.account_history_shards(address).unwrap();
3615        assert!(shards.is_empty(), "All shards should be deleted");
3616    }
3617
3618    #[test]
3619    fn test_unwind_non_sentinel_boundary() {
3620        let temp_dir = TempDir::new().unwrap();
3621        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3622
3623        let address = Address::from([0x42; 20]);
3624
3625        // Create three shards with non-sentinel boundary
3626        let mut batch = provider.batch();
3627
3628        // Shard 1: blocks 1-50, keyed by 50
3629        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
3630        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
3631
3632        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
3633        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
3634        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
3635
3636        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
3637        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
3638        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
3639
3640        batch.commit().unwrap();
3641
3642        // Verify 3 shards
3643        let shards = provider.account_history_shards(address).unwrap();
3644        assert_eq!(shards.len(), 3);
3645
3646        // Unwind to block 75 (truncates shard2, deletes shard3)
3647        let mut batch = provider.batch();
3648        batch.unwind_account_history_to(address, 75).unwrap();
3649        batch.commit().unwrap();
3650
3651        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
3652        let shards = provider.account_history_shards(address).unwrap();
3653        assert_eq!(shards.len(), 2);
3654
3655        // First shard unchanged
3656        assert_eq!(shards[0].0.highest_block_number, 50);
3657        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
3658
3659        // Second shard truncated and re-keyed to MAX
3660        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
3661        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
3662    }
3663
3664    #[test]
3665    fn test_batch_auto_commit_on_threshold() {
3666        let temp_dir = TempDir::new().unwrap();
3667        let provider =
3668            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
3669
3670        // Create batch with tiny threshold (1KB) to force auto-commits
3671        let mut batch = RocksDBBatch {
3672            provider: &provider,
3673            inner: WriteBatchWithTransaction::<true>::default(),
3674            buf: Vec::new(),
3675            auto_commit_threshold: Some(1024), // 1KB
3676        };
3677
3678        // Write entries until we exceed threshold multiple times
3679        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
3680        for i in 0..100u64 {
3681            let value = format!("value_{i:04}").into_bytes();
3682            batch.put::<TestTable>(i, &value).unwrap();
3683        }
3684
3685        // Data should already be visible (auto-committed) even before final commit
3686        // At least some entries should be readable
3687        let first_visible = provider.get::<TestTable>(0).unwrap();
3688        assert!(first_visible.is_some(), "Auto-committed data should be visible");
3689
3690        // Final commit for remaining batch
3691        batch.commit().unwrap();
3692
3693        // All entries should now be visible
3694        for i in 0..100u64 {
3695            let value = format!("value_{i:04}").into_bytes();
3696            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
3697        }
3698    }
3699
3700    // ==================== PARAMETERIZED PRUNE TESTS ====================
3701
3702    /// Test case for account history pruning
3703    struct AccountPruneCase {
3704        name: &'static str,
3705        initial_shards: &'static [(u64, &'static [u64])],
3706        prune_to: u64,
3707        expected_outcome: PruneShardOutcome,
3708        expected_shards: &'static [(u64, &'static [u64])],
3709    }
3710
3711    /// Test case for storage history pruning
3712    struct StoragePruneCase {
3713        name: &'static str,
3714        initial_shards: &'static [(u64, &'static [u64])],
3715        prune_to: u64,
3716        expected_outcome: PruneShardOutcome,
3717        expected_shards: &'static [(u64, &'static [u64])],
3718    }
3719
3720    #[test]
3721    fn test_prune_account_history_cases() {
3722        const MAX: u64 = u64::MAX;
3723        const CASES: &[AccountPruneCase] = &[
3724            AccountPruneCase {
3725                name: "single_shard_truncate",
3726                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3727                prune_to: 25,
3728                expected_outcome: PruneShardOutcome::Updated,
3729                expected_shards: &[(MAX, &[30, 40])],
3730            },
3731            AccountPruneCase {
3732                name: "single_shard_delete_all",
3733                initial_shards: &[(MAX, &[10, 20])],
3734                prune_to: 20,
3735                expected_outcome: PruneShardOutcome::Deleted,
3736                expected_shards: &[],
3737            },
3738            AccountPruneCase {
3739                name: "single_shard_noop",
3740                initial_shards: &[(MAX, &[10, 20])],
3741                prune_to: 5,
3742                expected_outcome: PruneShardOutcome::Unchanged,
3743                expected_shards: &[(MAX, &[10, 20])],
3744            },
3745            AccountPruneCase {
3746                name: "no_shards",
3747                initial_shards: &[],
3748                prune_to: 100,
3749                expected_outcome: PruneShardOutcome::Unchanged,
3750                expected_shards: &[],
3751            },
3752            AccountPruneCase {
3753                name: "multi_shard_truncate_first",
3754                initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
3755                prune_to: 25,
3756                expected_outcome: PruneShardOutcome::Updated,
3757                expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
3758            },
3759            AccountPruneCase {
3760                name: "delete_first_shard_sentinel_unchanged",
3761                initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
3762                prune_to: 20,
3763                expected_outcome: PruneShardOutcome::Deleted,
3764                expected_shards: &[(MAX, &[30, 40])],
3765            },
3766            AccountPruneCase {
3767                name: "multi_shard_delete_all_but_last",
3768                initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
3769                prune_to: 22,
3770                expected_outcome: PruneShardOutcome::Deleted,
3771                expected_shards: &[(MAX, &[25, 30])],
3772            },
3773            AccountPruneCase {
3774                name: "mid_shard_preserves_key",
3775                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3776                prune_to: 25,
3777                expected_outcome: PruneShardOutcome::Updated,
3778                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3779            },
3780            // Equivalence tests
3781            AccountPruneCase {
3782                name: "equiv_delete_early_shards_keep_sentinel",
3783                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3784                prune_to: 55,
3785                expected_outcome: PruneShardOutcome::Deleted,
3786                expected_shards: &[(MAX, &[60, 70])],
3787            },
3788            AccountPruneCase {
3789                name: "equiv_sentinel_becomes_empty_with_prev",
3790                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3791                prune_to: 40,
3792                expected_outcome: PruneShardOutcome::Deleted,
3793                expected_shards: &[(MAX, &[50])],
3794            },
3795            AccountPruneCase {
3796                name: "equiv_all_shards_become_empty",
3797                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3798                prune_to: 51,
3799                expected_outcome: PruneShardOutcome::Deleted,
3800                expected_shards: &[],
3801            },
3802            AccountPruneCase {
3803                name: "equiv_non_sentinel_last_shard_promoted",
3804                initial_shards: &[(100, &[50, 75, 100])],
3805                prune_to: 60,
3806                expected_outcome: PruneShardOutcome::Updated,
3807                expected_shards: &[(MAX, &[75, 100])],
3808            },
3809            AccountPruneCase {
3810                name: "equiv_filter_within_shard",
3811                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3812                prune_to: 25,
3813                expected_outcome: PruneShardOutcome::Updated,
3814                expected_shards: &[(MAX, &[30, 40])],
3815            },
3816            AccountPruneCase {
3817                name: "equiv_multi_shard_partial_delete",
3818                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3819                prune_to: 35,
3820                expected_outcome: PruneShardOutcome::Deleted,
3821                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3822            },
3823        ];
3824
3825        let address = Address::from([0x42; 20]);
3826
3827        for case in CASES {
3828            let temp_dir = TempDir::new().unwrap();
3829            let provider =
3830                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3831
3832            // Setup initial shards
3833            let mut batch = provider.batch();
3834            for (highest, blocks) in case.initial_shards {
3835                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3836                batch
3837                    .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
3838                    .unwrap();
3839            }
3840            batch.commit().unwrap();
3841
3842            // Prune
3843            let mut batch = provider.batch();
3844            let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
3845            batch.commit().unwrap();
3846
3847            // Assert outcome
3848            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3849
3850            // Assert final shards
3851            let shards = provider.account_history_shards(address).unwrap();
3852            assert_eq!(
3853                shards.len(),
3854                case.expected_shards.len(),
3855                "case '{}': wrong shard count",
3856                case.name
3857            );
3858            for (i, ((key, blocks), (exp_key, exp_blocks))) in
3859                shards.iter().zip(case.expected_shards.iter()).enumerate()
3860            {
3861                assert_eq!(
3862                    key.highest_block_number, *exp_key,
3863                    "case '{}': shard {} wrong key",
3864                    case.name, i
3865                );
3866                assert_eq!(
3867                    blocks.iter().collect::<Vec<_>>(),
3868                    *exp_blocks,
3869                    "case '{}': shard {} wrong blocks",
3870                    case.name,
3871                    i
3872                );
3873            }
3874        }
3875    }
3876
3877    #[test]
3878    fn test_prune_storage_history_cases() {
3879        const MAX: u64 = u64::MAX;
3880        const CASES: &[StoragePruneCase] = &[
3881            StoragePruneCase {
3882                name: "single_shard_truncate",
3883                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3884                prune_to: 25,
3885                expected_outcome: PruneShardOutcome::Updated,
3886                expected_shards: &[(MAX, &[30, 40])],
3887            },
3888            StoragePruneCase {
3889                name: "single_shard_delete_all",
3890                initial_shards: &[(MAX, &[10, 20])],
3891                prune_to: 20,
3892                expected_outcome: PruneShardOutcome::Deleted,
3893                expected_shards: &[],
3894            },
3895            StoragePruneCase {
3896                name: "noop",
3897                initial_shards: &[(MAX, &[10, 20])],
3898                prune_to: 5,
3899                expected_outcome: PruneShardOutcome::Unchanged,
3900                expected_shards: &[(MAX, &[10, 20])],
3901            },
3902            StoragePruneCase {
3903                name: "no_shards",
3904                initial_shards: &[],
3905                prune_to: 100,
3906                expected_outcome: PruneShardOutcome::Unchanged,
3907                expected_shards: &[],
3908            },
3909            StoragePruneCase {
3910                name: "mid_shard_preserves_key",
3911                initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
3912                prune_to: 25,
3913                expected_outcome: PruneShardOutcome::Updated,
3914                expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
3915            },
3916            // Equivalence tests
3917            StoragePruneCase {
3918                name: "equiv_sentinel_promotion",
3919                initial_shards: &[(100, &[50, 75, 100])],
3920                prune_to: 60,
3921                expected_outcome: PruneShardOutcome::Updated,
3922                expected_shards: &[(MAX, &[75, 100])],
3923            },
3924            StoragePruneCase {
3925                name: "equiv_delete_early_shards_keep_sentinel",
3926                initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3927                prune_to: 55,
3928                expected_outcome: PruneShardOutcome::Deleted,
3929                expected_shards: &[(MAX, &[60, 70])],
3930            },
3931            StoragePruneCase {
3932                name: "equiv_sentinel_becomes_empty_with_prev",
3933                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
3934                prune_to: 40,
3935                expected_outcome: PruneShardOutcome::Deleted,
3936                expected_shards: &[(MAX, &[50])],
3937            },
3938            StoragePruneCase {
3939                name: "equiv_all_shards_become_empty",
3940                initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
3941                prune_to: 51,
3942                expected_outcome: PruneShardOutcome::Deleted,
3943                expected_shards: &[],
3944            },
3945            StoragePruneCase {
3946                name: "equiv_filter_within_shard",
3947                initial_shards: &[(MAX, &[10, 20, 30, 40])],
3948                prune_to: 25,
3949                expected_outcome: PruneShardOutcome::Updated,
3950                expected_shards: &[(MAX, &[30, 40])],
3951            },
3952            StoragePruneCase {
3953                name: "equiv_multi_shard_partial_delete",
3954                initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
3955                prune_to: 35,
3956                expected_outcome: PruneShardOutcome::Deleted,
3957                expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
3958            },
3959        ];
3960
3961        let address = Address::from([0x42; 20]);
3962        let storage_key = B256::from([0x01; 32]);
3963
3964        for case in CASES {
3965            let temp_dir = TempDir::new().unwrap();
3966            let provider =
3967                RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
3968
3969            // Setup initial shards
3970            let mut batch = provider.batch();
3971            for (highest, blocks) in case.initial_shards {
3972                let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
3973                let key = if *highest == MAX {
3974                    StorageShardedKey::last(address, storage_key)
3975                } else {
3976                    StorageShardedKey::new(address, storage_key, *highest)
3977                };
3978                batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
3979            }
3980            batch.commit().unwrap();
3981
3982            // Prune
3983            let mut batch = provider.batch();
3984            let outcome =
3985                batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
3986            batch.commit().unwrap();
3987
3988            // Assert outcome
3989            assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
3990
3991            // Assert final shards
3992            let shards = provider.storage_history_shards(address, storage_key).unwrap();
3993            assert_eq!(
3994                shards.len(),
3995                case.expected_shards.len(),
3996                "case '{}': wrong shard count",
3997                case.name
3998            );
3999            for (i, ((key, blocks), (exp_key, exp_blocks))) in
4000                shards.iter().zip(case.expected_shards.iter()).enumerate()
4001            {
4002                assert_eq!(
4003                    key.sharded_key.highest_block_number, *exp_key,
4004                    "case '{}': shard {} wrong key",
4005                    case.name, i
4006                );
4007                assert_eq!(
4008                    blocks.iter().collect::<Vec<_>>(),
4009                    *exp_blocks,
4010                    "case '{}': shard {} wrong blocks",
4011                    case.name,
4012                    i
4013                );
4014            }
4015        }
4016    }
4017
4018    #[test]
4019    fn test_prune_storage_history_does_not_affect_other_slots() {
4020        let temp_dir = TempDir::new().unwrap();
4021        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4022
4023        let address = Address::from([0x42; 20]);
4024        let slot1 = B256::from([0x01; 32]);
4025        let slot2 = B256::from([0x02; 32]);
4026
4027        // Two different storage slots
4028        let mut batch = provider.batch();
4029        batch
4030            .put::<tables::StoragesHistory>(
4031                StorageShardedKey::last(address, slot1),
4032                &BlockNumberList::new_pre_sorted([10u64, 20]),
4033            )
4034            .unwrap();
4035        batch
4036            .put::<tables::StoragesHistory>(
4037                StorageShardedKey::last(address, slot2),
4038                &BlockNumberList::new_pre_sorted([30u64, 40]),
4039            )
4040            .unwrap();
4041        batch.commit().unwrap();
4042
4043        // Prune slot1 to block 20 (deletes all)
4044        let mut batch = provider.batch();
4045        let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
4046        batch.commit().unwrap();
4047
4048        assert_eq!(outcome, PruneShardOutcome::Deleted);
4049
4050        // slot1 should be empty
4051        let shards1 = provider.storage_history_shards(address, slot1).unwrap();
4052        assert!(shards1.is_empty());
4053
4054        // slot2 should be unchanged
4055        let shards2 = provider.storage_history_shards(address, slot2).unwrap();
4056        assert_eq!(shards2.len(), 1);
4057        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
4058    }
4059
4060    #[test]
4061    fn test_prune_invariants() {
4062        // Test invariants: no empty shards, sentinel is always last
4063        let address = Address::from([0x42; 20]);
4064        let storage_key = B256::from([0x01; 32]);
4065
4066        // Test cases that exercise invariants
4067        #[expect(clippy::type_complexity)]
4068        let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
4069            // Account: shards where middle becomes empty
4070            (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
4071            // Account: non-sentinel shard only, partial prune -> must become sentinel
4072            (&[(100, &[50, 100])], 60),
4073        ];
4074
4075        for (initial_shards, prune_to) in invariant_cases {
4076            // Test account history invariants
4077            {
4078                let temp_dir = TempDir::new().unwrap();
4079                let provider =
4080                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4081
4082                let mut batch = provider.batch();
4083                for (highest, blocks) in *initial_shards {
4084                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4085                    batch
4086                        .put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
4087                        .unwrap();
4088                }
4089                batch.commit().unwrap();
4090
4091                let mut batch = provider.batch();
4092                batch.prune_account_history_to(address, *prune_to).unwrap();
4093                batch.commit().unwrap();
4094
4095                let shards = provider.account_history_shards(address).unwrap();
4096
4097                // Invariant 1: no empty shards
4098                for (key, blocks) in &shards {
4099                    assert!(
4100                        !blocks.is_empty(),
4101                        "Account: empty shard at key {}",
4102                        key.highest_block_number
4103                    );
4104                }
4105
4106                // Invariant 2: last shard is sentinel
4107                if !shards.is_empty() {
4108                    let last = shards.last().unwrap();
4109                    assert_eq!(
4110                        last.0.highest_block_number,
4111                        u64::MAX,
4112                        "Account: last shard must be sentinel"
4113                    );
4114                }
4115            }
4116
4117            // Test storage history invariants
4118            {
4119                let temp_dir = TempDir::new().unwrap();
4120                let provider =
4121                    RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4122
4123                let mut batch = provider.batch();
4124                for (highest, blocks) in *initial_shards {
4125                    let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
4126                    let key = if *highest == u64::MAX {
4127                        StorageShardedKey::last(address, storage_key)
4128                    } else {
4129                        StorageShardedKey::new(address, storage_key, *highest)
4130                    };
4131                    batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
4132                }
4133                batch.commit().unwrap();
4134
4135                let mut batch = provider.batch();
4136                batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
4137                batch.commit().unwrap();
4138
4139                let shards = provider.storage_history_shards(address, storage_key).unwrap();
4140
4141                // Invariant 1: no empty shards
4142                for (key, blocks) in &shards {
4143                    assert!(
4144                        !blocks.is_empty(),
4145                        "Storage: empty shard at key {}",
4146                        key.sharded_key.highest_block_number
4147                    );
4148                }
4149
4150                // Invariant 2: last shard is sentinel
4151                if !shards.is_empty() {
4152                    let last = shards.last().unwrap();
4153                    assert_eq!(
4154                        last.0.sharded_key.highest_block_number,
4155                        u64::MAX,
4156                        "Storage: last shard must be sentinel"
4157                    );
4158                }
4159            }
4160        }
4161    }
4162
4163    #[test]
4164    fn test_prune_account_history_batch_multiple_sorted_targets() {
4165        let temp_dir = TempDir::new().unwrap();
4166        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4167
4168        let addr1 = Address::from([0x01; 20]);
4169        let addr2 = Address::from([0x02; 20]);
4170        let addr3 = Address::from([0x03; 20]);
4171
4172        // Setup shards for each address
4173        let mut batch = provider.batch();
4174        batch
4175            .put::<tables::AccountsHistory>(
4176                ShardedKey::new(addr1, u64::MAX),
4177                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4178            )
4179            .unwrap();
4180        batch
4181            .put::<tables::AccountsHistory>(
4182                ShardedKey::new(addr2, u64::MAX),
4183                &BlockNumberList::new_pre_sorted([5, 10, 15]),
4184            )
4185            .unwrap();
4186        batch
4187            .put::<tables::AccountsHistory>(
4188                ShardedKey::new(addr3, u64::MAX),
4189                &BlockNumberList::new_pre_sorted([100, 200]),
4190            )
4191            .unwrap();
4192        batch.commit().unwrap();
4193
4194        // Prune all three (sorted by address)
4195        let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
4196        targets.sort_by_key(|(addr, _)| *addr);
4197
4198        let mut batch = provider.batch();
4199        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4200        batch.commit().unwrap();
4201
4202        // addr1: prune <=15, keep [20, 30] -> updated
4203        // addr2: prune <=10, keep [15] -> updated
4204        // addr3: prune <=50, keep [100, 200] -> unchanged
4205        assert_eq!(outcomes.updated, 2);
4206        assert_eq!(outcomes.unchanged, 1);
4207
4208        let shards1 = provider.account_history_shards(addr1).unwrap();
4209        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4210
4211        let shards2 = provider.account_history_shards(addr2).unwrap();
4212        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
4213
4214        let shards3 = provider.account_history_shards(addr3).unwrap();
4215        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
4216    }
4217
4218    #[test]
4219    fn test_prune_account_history_batch_target_with_no_shards() {
4220        let temp_dir = TempDir::new().unwrap();
4221        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4222
4223        let addr1 = Address::from([0x01; 20]);
4224        let addr2 = Address::from([0x02; 20]); // No shards for this one
4225        let addr3 = Address::from([0x03; 20]);
4226
4227        // Only setup shards for addr1 and addr3
4228        let mut batch = provider.batch();
4229        batch
4230            .put::<tables::AccountsHistory>(
4231                ShardedKey::new(addr1, u64::MAX),
4232                &BlockNumberList::new_pre_sorted([10, 20]),
4233            )
4234            .unwrap();
4235        batch
4236            .put::<tables::AccountsHistory>(
4237                ShardedKey::new(addr3, u64::MAX),
4238                &BlockNumberList::new_pre_sorted([30, 40]),
4239            )
4240            .unwrap();
4241        batch.commit().unwrap();
4242
4243        // Prune all three (addr2 has no shards - tests p > target_prefix case)
4244        let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
4245        targets.sort_by_key(|(addr, _)| *addr);
4246
4247        let mut batch = provider.batch();
4248        let outcomes = batch.prune_account_history_batch(&targets).unwrap();
4249        batch.commit().unwrap();
4250
4251        // addr1: updated (keep [20])
4252        // addr2: unchanged (no shards)
4253        // addr3: updated (keep [40])
4254        assert_eq!(outcomes.updated, 2);
4255        assert_eq!(outcomes.unchanged, 1);
4256
4257        let shards1 = provider.account_history_shards(addr1).unwrap();
4258        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
4259
4260        let shards3 = provider.account_history_shards(addr3).unwrap();
4261        assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
4262    }
4263
4264    #[test]
4265    fn test_prune_storage_history_batch_multiple_sorted_targets() {
4266        let temp_dir = TempDir::new().unwrap();
4267        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
4268
4269        let addr = Address::from([0x42; 20]);
4270        let slot1 = B256::from([0x01; 32]);
4271        let slot2 = B256::from([0x02; 32]);
4272
4273        // Setup shards
4274        let mut batch = provider.batch();
4275        batch
4276            .put::<tables::StoragesHistory>(
4277                StorageShardedKey::new(addr, slot1, u64::MAX),
4278                &BlockNumberList::new_pre_sorted([10, 20, 30]),
4279            )
4280            .unwrap();
4281        batch
4282            .put::<tables::StoragesHistory>(
4283                StorageShardedKey::new(addr, slot2, u64::MAX),
4284                &BlockNumberList::new_pre_sorted([5, 15, 25]),
4285            )
4286            .unwrap();
4287        batch.commit().unwrap();
4288
4289        // Prune both (sorted)
4290        let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
4291        targets.sort_by_key(|((a, s), _)| (*a, *s));
4292
4293        let mut batch = provider.batch();
4294        let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
4295        batch.commit().unwrap();
4296
4297        assert_eq!(outcomes.updated, 2);
4298
4299        let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
4300        assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
4301
4302        let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
4303        assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
4304    }
4305}