reth_provider/providers/rocksdb/
provider.rs

1use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
2use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
3use alloy_consensus::transaction::TxHashRef;
4use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
5use itertools::Itertools;
6use metrics::Label;
7use parking_lot::Mutex;
8use reth_chain_state::ExecutedBlock;
9use reth_db_api::{
10    database_metrics::DatabaseMetrics,
11    models::{
12        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
13        StorageSettings,
14    },
15    table::{Compress, Decode, Decompress, Encode, Table},
16    tables, BlockNumberList, DatabaseError,
17};
18use reth_primitives_traits::BlockBody as _;
19use reth_prune_types::PruneMode;
20use reth_storage_errors::{
21    db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
22    provider::{ProviderError, ProviderResult},
23};
24use rocksdb::{
25    BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
26    DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
27    OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
28    DB,
29};
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt,
33    path::{Path, PathBuf},
34    sync::Arc,
35    thread,
36    time::Instant,
37};
38use tracing::instrument;
39
40/// Pending `RocksDB` batches type alias.
41pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
42
43/// Statistics for a single `RocksDB` table (column family).
44#[derive(Debug, Clone)]
45pub struct RocksDBTableStats {
46    /// Size of SST files on disk in bytes.
47    pub sst_size_bytes: u64,
48    /// Size of memtables in memory in bytes.
49    pub memtable_size_bytes: u64,
50    /// Name of the table/column family.
51    pub name: String,
52    /// Estimated number of keys in the table.
53    pub estimated_num_keys: u64,
54    /// Estimated size of live data in bytes (SST files + memtables).
55    pub estimated_size_bytes: u64,
56    /// Estimated bytes pending compaction (reclaimable space).
57    pub pending_compaction_bytes: u64,
58}
59
60/// Database-level statistics for `RocksDB`.
61///
62/// Contains both per-table statistics and DB-level metrics like WAL size.
63#[derive(Debug, Clone)]
64pub struct RocksDBStats {
65    /// Statistics for each table (column family).
66    pub tables: Vec<RocksDBTableStats>,
67    /// Total size of WAL (Write-Ahead Log) files in bytes.
68    ///
69    /// WAL is shared across all tables and not included in per-table metrics.
70    pub wal_size_bytes: u64,
71}
72
73/// Context for `RocksDB` block writes.
74#[derive(Clone)]
75pub(crate) struct RocksDBWriteCtx {
76    /// The first block number being written.
77    pub first_block_number: BlockNumber,
78    /// The prune mode for transaction lookup, if any.
79    pub prune_tx_lookup: Option<PruneMode>,
80    /// Storage settings determining what goes to `RocksDB`.
81    pub storage_settings: StorageSettings,
82    /// Pending batches to push to after writing.
83    pub pending_batches: PendingRocksDBBatches,
84}
85
86impl fmt::Debug for RocksDBWriteCtx {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("RocksDBWriteCtx")
89            .field("first_block_number", &self.first_block_number)
90            .field("prune_tx_lookup", &self.prune_tx_lookup)
91            .field("storage_settings", &self.storage_settings)
92            .field("pending_batches", &"<pending batches>")
93            .finish()
94    }
95}
96
97/// Default cache size for `RocksDB` block cache (128 MB).
98const DEFAULT_CACHE_SIZE: usize = 128 << 20;
99
100/// Default block size for `RocksDB` tables (16 KB).
101const DEFAULT_BLOCK_SIZE: usize = 16 * 1024;
102
103/// Default max background jobs for `RocksDB` compaction and flushing.
104const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 6;
105
106/// Default bytes per sync for `RocksDB` WAL writes (1 MB).
107const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
108
109/// Default buffer capacity for compression in batches.
110/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
111/// reducing the first few reallocations without over-allocating.
112const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
113
114/// Default auto-commit threshold for batch writes (4 GiB).
115///
116/// When a batch exceeds this size, it is automatically committed to prevent OOM
117/// during large bulk writes. The consistency check on startup heals any crash
118/// that occurs between auto-commits.
119const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
120
121/// Builder for [`RocksDBProvider`].
122pub struct RocksDBBuilder {
123    path: PathBuf,
124    column_families: Vec<String>,
125    enable_metrics: bool,
126    enable_statistics: bool,
127    log_level: rocksdb::LogLevel,
128    block_cache: Cache,
129    read_only: bool,
130}
131
132impl fmt::Debug for RocksDBBuilder {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("RocksDBBuilder")
135            .field("path", &self.path)
136            .field("column_families", &self.column_families)
137            .field("enable_metrics", &self.enable_metrics)
138            .finish()
139    }
140}
141
142impl RocksDBBuilder {
143    /// Creates a new builder with optimized default options.
144    pub fn new(path: impl AsRef<Path>) -> Self {
145        let cache = Cache::new_lru_cache(DEFAULT_CACHE_SIZE);
146        Self {
147            path: path.as_ref().to_path_buf(),
148            column_families: Vec::new(),
149            enable_metrics: false,
150            enable_statistics: false,
151            log_level: rocksdb::LogLevel::Info,
152            block_cache: cache,
153            read_only: false,
154        }
155    }
156
157    /// Creates default table options with shared block cache.
158    fn default_table_options(cache: &Cache) -> BlockBasedOptions {
159        let mut table_options = BlockBasedOptions::default();
160        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
161        table_options.set_cache_index_and_filter_blocks(true);
162        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
163        // Shared block cache for all column families.
164        table_options.set_block_cache(cache);
165        table_options
166    }
167
168    /// Creates optimized `RocksDB` options per `RocksDB` wiki recommendations.
169    fn default_options(
170        log_level: rocksdb::LogLevel,
171        cache: &Cache,
172        enable_statistics: bool,
173    ) -> Options {
174        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
175        let table_options = Self::default_table_options(cache);
176
177        let mut options = Options::default();
178        options.set_block_based_table_factory(&table_options);
179        options.create_if_missing(true);
180        options.create_missing_column_families(true);
181        options.set_max_background_jobs(DEFAULT_MAX_BACKGROUND_JOBS);
182        options.set_bytes_per_sync(DEFAULT_BYTES_PER_SYNC);
183
184        options.set_bottommost_compression_type(DBCompressionType::Zstd);
185        options.set_bottommost_zstd_max_train_bytes(0, true);
186        options.set_compression_type(DBCompressionType::Lz4);
187        options.set_compaction_pri(CompactionPri::MinOverlappingRatio);
188
189        options.set_log_level(log_level);
190
191        // Delete obsolete WAL files immediately after all column families have flushed.
192        // Both set to 0 means "delete ASAP, no archival".
193        options.set_wal_ttl_seconds(0);
194        options.set_wal_size_limit_mb(0);
195
196        // Statistics can view from RocksDB log file
197        if enable_statistics {
198            options.enable_statistics();
199        }
200
201        options
202    }
203
204    /// Creates optimized column family options.
205    fn default_column_family_options(cache: &Cache) -> Options {
206        // Follow recommend tuning guide from RocksDB wiki, see https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
207        let table_options = Self::default_table_options(cache);
208
209        let mut cf_options = Options::default();
210        cf_options.set_block_based_table_factory(&table_options);
211        cf_options.set_level_compaction_dynamic_level_bytes(true);
212        // Recommend to use Zstd for bottommost compression and Lz4 for other levels, see https://github.com/facebook/rocksdb/wiki/Compression#configuration
213        cf_options.set_compression_type(DBCompressionType::Lz4);
214        cf_options.set_bottommost_compression_type(DBCompressionType::Zstd);
215        // Only use Zstd compression, disable dictionary training
216        cf_options.set_bottommost_zstd_max_train_bytes(0, true);
217
218        cf_options
219    }
220
221    /// Creates optimized column family options for `TransactionHashNumbers`.
222    ///
223    /// This table stores `B256 -> TxNumber` mappings where:
224    /// - Keys are incompressible 32-byte hashes (compression wastes CPU for zero benefit)
225    /// - Values are varint-encoded `u64` (a few bytes - too small to benefit from compression)
226    /// - Every lookup expects a hit (bloom filters only help when checking non-existent keys)
227    fn tx_hash_numbers_column_family_options(cache: &Cache) -> Options {
228        let mut table_options = BlockBasedOptions::default();
229        table_options.set_block_size(DEFAULT_BLOCK_SIZE);
230        table_options.set_cache_index_and_filter_blocks(true);
231        table_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
232        table_options.set_block_cache(cache);
233        // Disable bloom filter: every lookup expects a hit, so bloom filters provide no benefit
234        // and waste memory
235
236        let mut cf_options = Options::default();
237        cf_options.set_block_based_table_factory(&table_options);
238        cf_options.set_level_compaction_dynamic_level_bytes(true);
239        // Disable compression: B256 keys are incompressible hashes, TxNumber values are
240        // varint-encoded u64 (a few bytes). Compression wastes CPU cycles for zero space savings.
241        cf_options.set_compression_type(DBCompressionType::None);
242        cf_options.set_bottommost_compression_type(DBCompressionType::None);
243
244        cf_options
245    }
246
247    /// Adds a column family for a specific table type.
248    pub fn with_table<T: Table>(mut self) -> Self {
249        self.column_families.push(T::NAME.to_string());
250        self
251    }
252
253    /// Registers the default tables used by reth for `RocksDB` storage.
254    ///
255    /// This registers:
256    /// - [`tables::TransactionHashNumbers`] - Transaction hash to number mapping
257    /// - [`tables::AccountsHistory`] - Account history index
258    /// - [`tables::StoragesHistory`] - Storage history index
259    pub fn with_default_tables(self) -> Self {
260        self.with_table::<tables::TransactionHashNumbers>()
261            .with_table::<tables::AccountsHistory>()
262            .with_table::<tables::StoragesHistory>()
263    }
264
265    /// Enables metrics.
266    pub const fn with_metrics(mut self) -> Self {
267        self.enable_metrics = true;
268        self
269    }
270
271    /// Enables `RocksDB` internal statistics collection.
272    pub const fn with_statistics(mut self) -> Self {
273        self.enable_statistics = true;
274        self
275    }
276
277    /// Sets the log level from `DatabaseArgs` configuration.
278    pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
279        if let Some(level) = log_level {
280            self.log_level = convert_log_level(level);
281        }
282        self
283    }
284
285    /// Sets a custom block cache size.
286    pub fn with_block_cache_size(mut self, capacity_bytes: usize) -> Self {
287        self.block_cache = Cache::new_lru_cache(capacity_bytes);
288        self
289    }
290
291    /// Sets read-only mode.
292    ///
293    /// Note: Write operations on a read-only provider will panic at runtime.
294    pub const fn with_read_only(mut self, read_only: bool) -> Self {
295        self.read_only = read_only;
296        self
297    }
298
299    /// Builds the [`RocksDBProvider`].
300    pub fn build(self) -> ProviderResult<RocksDBProvider> {
301        let options =
302            Self::default_options(self.log_level, &self.block_cache, self.enable_statistics);
303
304        let cf_descriptors: Vec<ColumnFamilyDescriptor> = self
305            .column_families
306            .iter()
307            .map(|name| {
308                let cf_options = if name == tables::TransactionHashNumbers::NAME {
309                    Self::tx_hash_numbers_column_family_options(&self.block_cache)
310                } else {
311                    Self::default_column_family_options(&self.block_cache)
312                };
313                ColumnFamilyDescriptor::new(name.clone(), cf_options)
314            })
315            .collect();
316
317        let metrics = self.enable_metrics.then(RocksDBMetrics::default);
318
319        if self.read_only {
320            let db = DB::open_cf_descriptors_read_only(&options, &self.path, cf_descriptors, false)
321                .map_err(|e| {
322                    ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
323                        message: e.to_string().into(),
324                        code: -1,
325                    }))
326                })?;
327            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadOnly { db, metrics })))
328        } else {
329            // Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
330            // rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
331            // detection at commit) and is backed by DBCommon, giving us access to
332            // cancel_all_background_work for clean shutdown.
333            let db =
334                OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
335                    .map_err(|e| {
336                        ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
337                            message: e.to_string().into(),
338                            code: -1,
339                        }))
340                    })?;
341            Ok(RocksDBProvider(Arc::new(RocksDBProviderInner::ReadWrite { db, metrics })))
342        }
343    }
344}
345
346/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
347/// allocated buffer when we can just use their reference.
348macro_rules! compress_to_buf_or_ref {
349    ($buf:expr, $value:expr) => {
350        if let Some(value) = $value.uncompressable_ref() {
351            Some(value)
352        } else {
353            $buf.clear();
354            $value.compress_to_buf(&mut $buf);
355            None
356        }
357    };
358}
359
360/// `RocksDB` provider for auxiliary storage layer beside main database MDBX.
361#[derive(Debug)]
362pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
363
364/// Inner state for `RocksDB` provider.
365enum RocksDBProviderInner {
366    /// Read-write mode using `OptimisticTransactionDB`.
367    ReadWrite {
368        /// `RocksDB` database instance with optimistic transaction support.
369        db: OptimisticTransactionDB,
370        /// Metrics latency & operations.
371        metrics: Option<RocksDBMetrics>,
372    },
373    /// Read-only mode using `DB` opened with `open_cf_descriptors_read_only`.
374    /// This doesn't acquire an exclusive lock, allowing concurrent reads.
375    ReadOnly {
376        /// Read-only `RocksDB` database instance.
377        db: DB,
378        /// Metrics latency & operations.
379        metrics: Option<RocksDBMetrics>,
380    },
381}
382
383impl RocksDBProviderInner {
384    /// Returns the metrics for this provider.
385    const fn metrics(&self) -> Option<&RocksDBMetrics> {
386        match self {
387            Self::ReadWrite { metrics, .. } | Self::ReadOnly { metrics, .. } => metrics.as_ref(),
388        }
389    }
390
391    /// Returns the read-write database, panicking if in read-only mode.
392    fn db_rw(&self) -> &OptimisticTransactionDB {
393        match self {
394            Self::ReadWrite { db, .. } => db,
395            Self::ReadOnly { .. } => {
396                panic!("Cannot perform write operation on read-only RocksDB provider")
397            }
398        }
399    }
400
401    /// Gets the column family handle for a table.
402    fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
403        let cf = match self {
404            Self::ReadWrite { db, .. } => db.cf_handle(T::NAME),
405            Self::ReadOnly { db, .. } => db.cf_handle(T::NAME),
406        };
407        cf.ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", T::NAME)))
408    }
409
410    /// Gets the column family handle for a table from the read-write database.
411    ///
412    /// # Panics
413    /// Panics if in read-only mode.
414    fn cf_handle_rw(&self, name: &str) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
415        self.db_rw()
416            .cf_handle(name)
417            .ok_or_else(|| DatabaseError::Other(format!("Column family '{}' not found", name)))
418    }
419
420    /// Gets a value from a column family.
421    fn get_cf(
422        &self,
423        cf: &rocksdb::ColumnFamily,
424        key: impl AsRef<[u8]>,
425    ) -> Result<Option<Vec<u8>>, rocksdb::Error> {
426        match self {
427            Self::ReadWrite { db, .. } => db.get_cf(cf, key),
428            Self::ReadOnly { db, .. } => db.get_cf(cf, key),
429        }
430    }
431
432    /// Puts a value into a column family.
433    fn put_cf(
434        &self,
435        cf: &rocksdb::ColumnFamily,
436        key: impl AsRef<[u8]>,
437        value: impl AsRef<[u8]>,
438    ) -> Result<(), rocksdb::Error> {
439        self.db_rw().put_cf(cf, key, value)
440    }
441
442    /// Deletes a value from a column family.
443    fn delete_cf(
444        &self,
445        cf: &rocksdb::ColumnFamily,
446        key: impl AsRef<[u8]>,
447    ) -> Result<(), rocksdb::Error> {
448        self.db_rw().delete_cf(cf, key)
449    }
450
451    /// Deletes a range of values from a column family.
452    fn delete_range_cf<K: AsRef<[u8]>>(
453        &self,
454        cf: &rocksdb::ColumnFamily,
455        from: K,
456        to: K,
457    ) -> Result<(), rocksdb::Error> {
458        self.db_rw().delete_range_cf(cf, from, to)
459    }
460
461    /// Returns an iterator over a column family.
462    fn iterator_cf(
463        &self,
464        cf: &rocksdb::ColumnFamily,
465        mode: IteratorMode<'_>,
466    ) -> RocksDBIterEnum<'_> {
467        match self {
468            Self::ReadWrite { db, .. } => RocksDBIterEnum::ReadWrite(db.iterator_cf(cf, mode)),
469            Self::ReadOnly { db, .. } => RocksDBIterEnum::ReadOnly(db.iterator_cf(cf, mode)),
470        }
471    }
472
473    /// Returns the path to the database directory.
474    fn path(&self) -> &Path {
475        match self {
476            Self::ReadWrite { db, .. } => db.path(),
477            Self::ReadOnly { db, .. } => db.path(),
478        }
479    }
480
481    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
482    ///
483    /// WAL files have a `.log` extension in the `RocksDB` directory.
484    fn wal_size_bytes(&self) -> u64 {
485        let path = self.path();
486
487        match std::fs::read_dir(path) {
488            Ok(entries) => entries
489                .filter_map(|e| e.ok())
490                .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
491                .filter_map(|e| e.metadata().ok())
492                .map(|m| m.len())
493                .sum(),
494            Err(_) => 0,
495        }
496    }
497
498    /// Returns statistics for all column families in the database.
499    fn table_stats(&self) -> Vec<RocksDBTableStats> {
500        let mut stats = Vec::new();
501
502        macro_rules! collect_stats {
503            ($db:expr) => {
504                for cf_name in ROCKSDB_TABLES {
505                    if let Some(cf) = $db.cf_handle(cf_name) {
506                        let estimated_num_keys = $db
507                            .property_int_value_cf(cf, rocksdb::properties::ESTIMATE_NUM_KEYS)
508                            .ok()
509                            .flatten()
510                            .unwrap_or(0);
511
512                        // SST files size (on-disk) + memtable size (in-memory)
513                        let sst_size = $db
514                            .property_int_value_cf(cf, rocksdb::properties::LIVE_SST_FILES_SIZE)
515                            .ok()
516                            .flatten()
517                            .unwrap_or(0);
518
519                        let memtable_size = $db
520                            .property_int_value_cf(cf, rocksdb::properties::SIZE_ALL_MEM_TABLES)
521                            .ok()
522                            .flatten()
523                            .unwrap_or(0);
524
525                        let estimated_size_bytes = sst_size + memtable_size;
526
527                        let pending_compaction_bytes = $db
528                            .property_int_value_cf(
529                                cf,
530                                rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
531                            )
532                            .ok()
533                            .flatten()
534                            .unwrap_or(0);
535
536                        stats.push(RocksDBTableStats {
537                            sst_size_bytes: sst_size,
538                            memtable_size_bytes: memtable_size,
539                            name: cf_name.to_string(),
540                            estimated_num_keys,
541                            estimated_size_bytes,
542                            pending_compaction_bytes,
543                        });
544                    }
545                }
546            };
547        }
548
549        match self {
550            Self::ReadWrite { db, .. } => collect_stats!(db),
551            Self::ReadOnly { db, .. } => collect_stats!(db),
552        }
553
554        stats
555    }
556
557    /// Returns database-level statistics including per-table stats and WAL size.
558    fn db_stats(&self) -> RocksDBStats {
559        RocksDBStats { tables: self.table_stats(), wal_size_bytes: self.wal_size_bytes() }
560    }
561}
562
563impl fmt::Debug for RocksDBProviderInner {
564    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565        match self {
566            Self::ReadWrite { metrics, .. } => f
567                .debug_struct("RocksDBProviderInner::ReadWrite")
568                .field("db", &"<OptimisticTransactionDB>")
569                .field("metrics", metrics)
570                .finish(),
571            Self::ReadOnly { metrics, .. } => f
572                .debug_struct("RocksDBProviderInner::ReadOnly")
573                .field("db", &"<DB (read-only)>")
574                .field("metrics", metrics)
575                .finish(),
576        }
577    }
578}
579
580impl Drop for RocksDBProviderInner {
581    fn drop(&mut self) {
582        match self {
583            Self::ReadWrite { db, .. } => {
584                // Flush all memtables if possible. If not, they will be rebuilt from the WAL on
585                // restart
586                if let Err(e) = db.flush_wal(true) {
587                    tracing::warn!(target: "providers::rocksdb", ?e, "Failed to flush WAL on drop");
588                }
589                for cf_name in ROCKSDB_TABLES {
590                    if let Some(cf) = db.cf_handle(cf_name) &&
591                        let Err(e) = db.flush_cf(&cf)
592                    {
593                        tracing::warn!(target: "providers::rocksdb", cf = cf_name, ?e, "Failed to flush CF on drop");
594                    }
595                }
596                db.cancel_all_background_work(true);
597            }
598            Self::ReadOnly { db, .. } => db.cancel_all_background_work(true),
599        }
600    }
601}
602
603impl Clone for RocksDBProvider {
604    fn clone(&self) -> Self {
605        Self(self.0.clone())
606    }
607}
608
609impl DatabaseMetrics for RocksDBProvider {
610    fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
611        let mut metrics = Vec::new();
612
613        for stat in self.table_stats() {
614            metrics.push((
615                "rocksdb.table_size",
616                stat.estimated_size_bytes as f64,
617                vec![Label::new("table", stat.name.clone())],
618            ));
619            metrics.push((
620                "rocksdb.table_entries",
621                stat.estimated_num_keys as f64,
622                vec![Label::new("table", stat.name.clone())],
623            ));
624            metrics.push((
625                "rocksdb.pending_compaction_bytes",
626                stat.pending_compaction_bytes as f64,
627                vec![Label::new("table", stat.name.clone())],
628            ));
629            metrics.push((
630                "rocksdb.sst_size",
631                stat.sst_size_bytes as f64,
632                vec![Label::new("table", stat.name.clone())],
633            ));
634            metrics.push((
635                "rocksdb.memtable_size",
636                stat.memtable_size_bytes as f64,
637                vec![Label::new("table", stat.name)],
638            ));
639        }
640
641        // WAL size (DB-level, shared across all tables)
642        metrics.push(("rocksdb.wal_size", self.wal_size_bytes() as f64, vec![]));
643
644        metrics
645    }
646}
647
648impl RocksDBProvider {
649    /// Creates a new `RocksDB` provider.
650    pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
651        RocksDBBuilder::new(path).build()
652    }
653
654    /// Creates a new `RocksDB` provider builder.
655    pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
656        RocksDBBuilder::new(path)
657    }
658
659    /// Returns `true` if this provider is in read-only mode.
660    pub fn is_read_only(&self) -> bool {
661        matches!(self.0.as_ref(), RocksDBProviderInner::ReadOnly { .. })
662    }
663
664    /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
665    ///
666    /// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
667    /// Conflict detection happens at commit time, not at write time.
668    ///
669    /// # Panics
670    /// Panics if the provider is in read-only mode.
671    pub fn tx(&self) -> RocksTx<'_> {
672        let write_options = WriteOptions::default();
673        let txn_options = OptimisticTransactionOptions::default();
674        let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
675        RocksTx { inner, provider: self }
676    }
677
678    /// Creates a new batch for atomic writes.
679    ///
680    /// Use [`Self::write_batch`] for closure-based atomic writes.
681    /// Use this method when the batch needs to be held by [`crate::EitherWriter`].
682    ///
683    /// # Panics
684    /// Panics if the provider is in read-only mode when attempting to commit.
685    pub fn batch(&self) -> RocksDBBatch<'_> {
686        RocksDBBatch {
687            provider: self,
688            inner: WriteBatchWithTransaction::<true>::default(),
689            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
690            auto_commit_threshold: None,
691        }
692    }
693
694    /// Creates a new batch with auto-commit enabled.
695    ///
696    /// When the batch size exceeds the threshold (4 GiB), the batch is automatically
697    /// committed and reset. This prevents OOM during large bulk writes while maintaining
698    /// crash-safety via the consistency check on startup.
699    pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
700        RocksDBBatch {
701            provider: self,
702            inner: WriteBatchWithTransaction::<true>::default(),
703            buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
704            auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
705        }
706    }
707
708    /// Gets the column family handle for a table.
709    fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
710        self.0.cf_handle::<T>()
711    }
712
713    /// Executes a function and records metrics with the given operation and table name.
714    fn execute_with_operation_metric<R>(
715        &self,
716        operation: RocksDBOperation,
717        table: &'static str,
718        f: impl FnOnce(&Self) -> R,
719    ) -> R {
720        let start = self.0.metrics().map(|_| Instant::now());
721        let res = f(self);
722
723        if let (Some(start), Some(metrics)) = (start, self.0.metrics()) {
724            metrics.record_operation(operation, table, start.elapsed());
725        }
726
727        res
728    }
729
730    /// Gets a value from the specified table.
731    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
732        self.get_encoded::<T>(&key.encode())
733    }
734
735    /// Gets a value from the specified table using pre-encoded key.
736    pub fn get_encoded<T: Table>(
737        &self,
738        key: &<T::Key as Encode>::Encoded,
739    ) -> ProviderResult<Option<T::Value>> {
740        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
741            let result = this.0.get_cf(this.get_cf_handle::<T>()?, key.as_ref()).map_err(|e| {
742                ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
743                    message: e.to_string().into(),
744                    code: -1,
745                }))
746            })?;
747
748            Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
749        })
750    }
751
752    /// Puts upsert a value into the specified table with the given key.
753    ///
754    /// # Panics
755    /// Panics if the provider is in read-only mode.
756    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
757        let encoded_key = key.encode();
758        self.put_encoded::<T>(&encoded_key, value)
759    }
760
761    /// Puts a value into the specified table using pre-encoded key.
762    ///
763    /// # Panics
764    /// Panics if the provider is in read-only mode.
765    pub fn put_encoded<T: Table>(
766        &self,
767        key: &<T::Key as Encode>::Encoded,
768        value: &T::Value,
769    ) -> ProviderResult<()> {
770        self.execute_with_operation_metric(RocksDBOperation::Put, T::NAME, |this| {
771            // for simplify the code, we need allocate buf here each time because `RocksDBProvider`
772            // is thread safe if user want to avoid allocate buf each time, they can use
773            // write_batch api
774            let mut buf = Vec::new();
775            let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
776
777            this.0.put_cf(this.get_cf_handle::<T>()?, key, value_bytes).map_err(|e| {
778                ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
779                    info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
780                    operation: DatabaseWriteOperation::PutUpsert,
781                    table_name: T::NAME,
782                    key: key.as_ref().to_vec(),
783                })))
784            })
785        })
786    }
787
788    /// Deletes a value from the specified table.
789    ///
790    /// # Panics
791    /// Panics if the provider is in read-only mode.
792    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
793        self.execute_with_operation_metric(RocksDBOperation::Delete, T::NAME, |this| {
794            this.0.delete_cf(this.get_cf_handle::<T>()?, key.encode().as_ref()).map_err(|e| {
795                ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
796                    message: e.to_string().into(),
797                    code: -1,
798                }))
799            })
800        })
801    }
802
803    /// Clears all entries from the specified table.
804    ///
805    /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
806    /// This end key must exceed the maximum encoded key size for any table.
807    /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
808    pub fn clear<T: Table>(&self) -> ProviderResult<()> {
809        let cf = self.get_cf_handle::<T>()?;
810
811        self.0.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
812            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
813                message: e.to_string().into(),
814                code: -1,
815            }))
816        })?;
817
818        Ok(())
819    }
820
821    /// Gets the first (smallest key) entry from the specified table.
822    pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
823        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
824            let cf = this.get_cf_handle::<T>()?;
825            let mut iter = this.0.iterator_cf(cf, IteratorMode::Start);
826
827            match iter.next() {
828                Some(Ok((key_bytes, value_bytes))) => {
829                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
830                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
831                    let value = T::Value::decompress(&value_bytes)
832                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
833                    Ok(Some((key, value)))
834                }
835                Some(Err(e)) => {
836                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
837                        message: e.to_string().into(),
838                        code: -1,
839                    })))
840                }
841                None => Ok(None),
842            }
843        })
844    }
845
846    /// Gets the last (largest key) entry from the specified table.
847    pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
848        self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
849            let cf = this.get_cf_handle::<T>()?;
850            let mut iter = this.0.iterator_cf(cf, IteratorMode::End);
851
852            match iter.next() {
853                Some(Ok((key_bytes, value_bytes))) => {
854                    let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
855                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
856                    let value = T::Value::decompress(&value_bytes)
857                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
858                    Ok(Some((key, value)))
859                }
860                Some(Err(e)) => {
861                    Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
862                        message: e.to_string().into(),
863                        code: -1,
864                    })))
865                }
866                None => Ok(None),
867            }
868        })
869    }
870
871    /// Creates an iterator over all entries in the specified table.
872    ///
873    /// Returns decoded `(Key, Value)` pairs in key order.
874    pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
875        let cf = self.get_cf_handle::<T>()?;
876        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
877        Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
878    }
879
880    /// Returns statistics for all column families in the database.
881    ///
882    /// Returns a vector of (`table_name`, `estimated_keys`, `estimated_size_bytes`) tuples.
883    pub fn table_stats(&self) -> Vec<RocksDBTableStats> {
884        self.0.table_stats()
885    }
886
887    /// Returns the total size of WAL (Write-Ahead Log) files in bytes.
888    ///
889    /// This scans the `RocksDB` directory for `.log` files and sums their sizes.
890    /// WAL files can be significant (e.g., 2.7GB observed) and are not included
891    /// in `table_size`, `sst_size`, or `memtable_size` metrics.
892    pub fn wal_size_bytes(&self) -> u64 {
893        self.0.wal_size_bytes()
894    }
895
896    /// Returns database-level statistics including per-table stats and WAL size.
897    ///
898    /// This combines [`Self::table_stats`] and [`Self::wal_size_bytes`] into a single struct.
899    pub fn db_stats(&self) -> RocksDBStats {
900        self.0.db_stats()
901    }
902
903    /// Flushes pending writes for the specified tables to disk.
904    ///
905    /// This performs a flush of:
906    /// 1. The column family memtables for the specified table names to SST files
907    /// 2. The Write-Ahead Log (WAL) with sync
908    ///
909    /// After this call completes, all data for the specified tables is durably persisted to disk.
910    ///
911    /// # Panics
912    /// Panics if the provider is in read-only mode.
913    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
914    pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
915        let db = self.0.db_rw();
916
917        for cf_name in tables {
918            if let Some(cf) = db.cf_handle(cf_name) {
919                db.flush_cf(&cf).map_err(|e| {
920                    ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
921                        info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
922                        operation: DatabaseWriteOperation::Flush,
923                        table_name: cf_name,
924                        key: Vec::new(),
925                    })))
926                })?;
927            }
928        }
929
930        db.flush_wal(true).map_err(|e| {
931            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
932                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
933                operation: DatabaseWriteOperation::Flush,
934                table_name: "WAL",
935                key: Vec::new(),
936            })))
937        })?;
938
939        Ok(())
940    }
941
942    /// Creates a raw iterator over all entries in the specified table.
943    ///
944    /// Returns raw `(key_bytes, value_bytes)` pairs without decoding.
945    pub fn raw_iter<T: Table>(&self) -> ProviderResult<RocksDBRawIter<'_>> {
946        let cf = self.get_cf_handle::<T>()?;
947        let iter = self.0.iterator_cf(cf, IteratorMode::Start);
948        Ok(RocksDBRawIter { inner: iter })
949    }
950
951    /// Returns all account history shards for the given address in ascending key order.
952    ///
953    /// This is used for unwind operations where we need to scan all shards for an address
954    /// and potentially delete or truncate them.
955    pub fn account_history_shards(
956        &self,
957        address: Address,
958    ) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
959        // Get the column family handle for the AccountsHistory table.
960        let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
961
962        // Build a seek key starting at the first shard (highest_block_number = 0) for this address.
963        // ShardedKey is (address, highest_block_number) so this positions us at the beginning.
964        let start_key = ShardedKey::new(address, 0u64);
965        let start_bytes = start_key.encode();
966
967        // Create a forward iterator starting from our seek position.
968        let iter = self
969            .0
970            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
971
972        let mut result = Vec::new();
973        for item in iter {
974            match item {
975                Ok((key_bytes, value_bytes)) => {
976                    // Decode the sharded key to check if we're still on the same address.
977                    let key = ShardedKey::<Address>::decode(&key_bytes)
978                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
979
980                    // Stop when we reach a different address (keys are sorted by address first).
981                    if key.key != address {
982                        break;
983                    }
984
985                    // Decompress the block number list stored in this shard.
986                    let value = BlockNumberList::decompress(&value_bytes)
987                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
988
989                    result.push((key, value));
990                }
991                Err(e) => {
992                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
993                        message: e.to_string().into(),
994                        code: -1,
995                    })));
996                }
997            }
998        }
999
1000        Ok(result)
1001    }
1002
1003    /// Returns all storage history shards for the given `(address, storage_key)` pair.
1004    ///
1005    /// Iterates through all shards in ascending `highest_block_number` order until
1006    /// a different `(address, storage_key)` is encountered.
1007    pub fn storage_history_shards(
1008        &self,
1009        address: Address,
1010        storage_key: B256,
1011    ) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
1012        let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
1013
1014        let start_key = StorageShardedKey::new(address, storage_key, 0u64);
1015        let start_bytes = start_key.encode();
1016
1017        let iter = self
1018            .0
1019            .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
1020
1021        let mut result = Vec::new();
1022        for item in iter {
1023            match item {
1024                Ok((key_bytes, value_bytes)) => {
1025                    let key = StorageShardedKey::decode(&key_bytes)
1026                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1027
1028                    if key.address != address || key.sharded_key.key != storage_key {
1029                        break;
1030                    }
1031
1032                    let value = BlockNumberList::decompress(&value_bytes)
1033                        .map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
1034
1035                    result.push((key, value));
1036                }
1037                Err(e) => {
1038                    return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1039                        message: e.to_string().into(),
1040                        code: -1,
1041                    })));
1042                }
1043            }
1044        }
1045
1046        Ok(result)
1047    }
1048
1049    /// Unwinds account history indices for the given `(address, block_number)` pairs.
1050    ///
1051    /// Groups addresses by their minimum block number and calls the appropriate unwind
1052    /// operations. For each address, keeps only blocks less than the minimum block
1053    /// (i.e., removes the minimum block and all higher blocks).
1054    ///
1055    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1056    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1057    pub fn unwind_account_history_indices(
1058        &self,
1059        last_indices: &[(Address, BlockNumber)],
1060    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1061        let mut address_min_block: HashMap<Address, BlockNumber> =
1062            HashMap::with_capacity_and_hasher(last_indices.len(), Default::default());
1063        for &(address, block_number) in last_indices {
1064            address_min_block
1065                .entry(address)
1066                .and_modify(|min| *min = (*min).min(block_number))
1067                .or_insert(block_number);
1068        }
1069
1070        let mut batch = self.batch();
1071        for (address, min_block) in address_min_block {
1072            match min_block.checked_sub(1) {
1073                Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
1074                None => batch.clear_account_history(address)?,
1075            }
1076        }
1077
1078        Ok(batch.into_inner())
1079    }
1080
1081    /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
1082    ///
1083    /// Groups by `(address, storage_key)` and finds the minimum block number for each.
1084    /// For each key, keeps only blocks less than the minimum block
1085    /// (i.e., removes the minimum block and all higher blocks).
1086    ///
1087    /// Returns a `WriteBatchWithTransaction` that can be committed later.
1088    pub fn unwind_storage_history_indices(
1089        &self,
1090        storage_changesets: &[(Address, B256, BlockNumber)],
1091    ) -> ProviderResult<WriteBatchWithTransaction<true>> {
1092        let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
1093            HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
1094        for &(address, storage_key, block_number) in storage_changesets {
1095            key_min_block
1096                .entry((address, storage_key))
1097                .and_modify(|min| *min = (*min).min(block_number))
1098                .or_insert(block_number);
1099        }
1100
1101        let mut batch = self.batch();
1102        for ((address, storage_key), min_block) in key_min_block {
1103            match min_block.checked_sub(1) {
1104                Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
1105                None => batch.clear_storage_history(address, storage_key)?,
1106            }
1107        }
1108
1109        Ok(batch.into_inner())
1110    }
1111
1112    /// Writes a batch of operations atomically.
1113    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1114    pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
1115    where
1116        F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
1117    {
1118        self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
1119            let mut batch_handle = this.batch();
1120            f(&mut batch_handle)?;
1121            batch_handle.commit()
1122        })
1123    }
1124
1125    /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
1126    ///
1127    /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
1128    /// and needs to be committed at a later point (e.g., at provider commit time).
1129    ///
1130    /// # Panics
1131    /// Panics if the provider is in read-only mode.
1132    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = batch.len(), batch_size = batch.size_in_bytes()))]
1133    pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
1134        self.0.db_rw().write_opt(batch, &WriteOptions::default()).map_err(|e| {
1135            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1136                message: e.to_string().into(),
1137                code: -1,
1138            }))
1139        })
1140    }
1141
1142    /// Writes all `RocksDB` data for multiple blocks in parallel.
1143    ///
1144    /// This handles transaction hash numbers, account history, and storage history based on
1145    /// the provided storage settings. Each operation runs in parallel with its own batch,
1146    /// pushing to `ctx.pending_batches` for later commit.
1147    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(num_blocks = blocks.len(), first_block = ctx.first_block_number))]
1148    pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
1149        &self,
1150        blocks: &[ExecutedBlock<N>],
1151        tx_nums: &[TxNumber],
1152        ctx: RocksDBWriteCtx,
1153    ) -> ProviderResult<()> {
1154        if !ctx.storage_settings.any_in_rocksdb() {
1155            return Ok(());
1156        }
1157
1158        thread::scope(|s| {
1159            let handles: Vec<_> = [
1160                (ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
1161                    ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
1162                .then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
1163                ctx.storage_settings
1164                    .account_history_in_rocksdb
1165                    .then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
1166                ctx.storage_settings
1167                    .storages_history_in_rocksdb
1168                    .then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
1169            ]
1170            .into_iter()
1171            .enumerate()
1172            .filter_map(|(i, h)| h.map(|h| (i, h)))
1173            .collect();
1174
1175            for (i, handle) in handles {
1176                handle.join().map_err(|_| {
1177                    ProviderError::Database(DatabaseError::Other(format!(
1178                        "rocksdb write thread {i} panicked"
1179                    )))
1180                })??;
1181            }
1182
1183            Ok(())
1184        })
1185    }
1186
1187    /// Writes transaction hash to number mappings for the given blocks.
1188    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1189    fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
1190        &self,
1191        blocks: &[ExecutedBlock<N>],
1192        tx_nums: &[TxNumber],
1193        ctx: &RocksDBWriteCtx,
1194    ) -> ProviderResult<()> {
1195        let mut batch = self.batch();
1196        for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
1197            let body = block.recovered_block().body();
1198            let mut tx_num = first_tx_num;
1199            for transaction in body.transactions_iter() {
1200                batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
1201                tx_num += 1;
1202            }
1203        }
1204        ctx.pending_batches.lock().push(batch.into_inner());
1205        Ok(())
1206    }
1207
1208    /// Writes account history indices for the given blocks.
1209    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1210    fn write_account_history<N: reth_node_types::NodePrimitives>(
1211        &self,
1212        blocks: &[ExecutedBlock<N>],
1213        ctx: &RocksDBWriteCtx,
1214    ) -> ProviderResult<()> {
1215        let mut batch = self.batch();
1216        let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
1217        for (block_idx, block) in blocks.iter().enumerate() {
1218            let block_number = ctx.first_block_number + block_idx as u64;
1219            let bundle = &block.execution_outcome().state;
1220            for &address in bundle.state().keys() {
1221                account_history.entry(address).or_default().push(block_number);
1222            }
1223        }
1224
1225        // Write account history using proper shard append logic
1226        for (address, indices) in account_history {
1227            batch.append_account_history_shard(address, indices)?;
1228        }
1229        ctx.pending_batches.lock().push(batch.into_inner());
1230        Ok(())
1231    }
1232
1233    /// Writes storage history indices for the given blocks.
1234    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1235    fn write_storage_history<N: reth_node_types::NodePrimitives>(
1236        &self,
1237        blocks: &[ExecutedBlock<N>],
1238        ctx: &RocksDBWriteCtx,
1239    ) -> ProviderResult<()> {
1240        let mut batch = self.batch();
1241        let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
1242        for (block_idx, block) in blocks.iter().enumerate() {
1243            let block_number = ctx.first_block_number + block_idx as u64;
1244            let bundle = &block.execution_outcome().state;
1245            for (&address, account) in bundle.state() {
1246                for &slot in account.storage.keys() {
1247                    let key = B256::new(slot.to_be_bytes());
1248                    storage_history.entry((address, key)).or_default().push(block_number);
1249                }
1250            }
1251        }
1252
1253        // Write storage history using proper shard append logic
1254        for ((address, slot), indices) in storage_history {
1255            batch.append_storage_history_shard(address, slot, indices)?;
1256        }
1257        ctx.pending_batches.lock().push(batch.into_inner());
1258        Ok(())
1259    }
1260}
1261
1262/// Handle for building a batch of operations atomically.
1263///
1264/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
1265/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
1266/// where you don't need to read back uncommitted data within the same operation
1267/// (e.g., history index writes).
1268///
1269/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
1270/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
1271#[must_use = "batch must be committed"]
1272pub struct RocksDBBatch<'a> {
1273    provider: &'a RocksDBProvider,
1274    inner: WriteBatchWithTransaction<true>,
1275    buf: Vec<u8>,
1276    /// If set, batch auto-commits when size exceeds this threshold (in bytes).
1277    auto_commit_threshold: Option<usize>,
1278}
1279
1280impl fmt::Debug for RocksDBBatch<'_> {
1281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1282        f.debug_struct("RocksDBBatch")
1283            .field("provider", &self.provider)
1284            .field("batch", &"<WriteBatchWithTransaction>")
1285            // Number of operations in this batch
1286            .field("length", &self.inner.len())
1287            // Total serialized size (encoded key + compressed value + metadata) of this batch
1288            // in bytes
1289            .field("size_in_bytes", &self.inner.size_in_bytes())
1290            .finish()
1291    }
1292}
1293
1294impl<'a> RocksDBBatch<'a> {
1295    /// Puts a value into the batch.
1296    ///
1297    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1298    pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1299        let encoded_key = key.encode();
1300        self.put_encoded::<T>(&encoded_key, value)
1301    }
1302
1303    /// Puts a value into the batch using pre-encoded key.
1304    ///
1305    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1306    pub fn put_encoded<T: Table>(
1307        &mut self,
1308        key: &<T::Key as Encode>::Encoded,
1309        value: &T::Value,
1310    ) -> ProviderResult<()> {
1311        let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
1312        self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
1313        self.maybe_auto_commit()?;
1314        Ok(())
1315    }
1316
1317    /// Deletes a value from the batch.
1318    ///
1319    /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
1320    pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
1321        self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
1322        self.maybe_auto_commit()?;
1323        Ok(())
1324    }
1325
1326    /// Commits and resets the batch if it exceeds the auto-commit threshold.
1327    ///
1328    /// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
1329    /// Returns immediately if auto-commit is disabled or threshold not reached.
1330    fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
1331        if let Some(threshold) = self.auto_commit_threshold &&
1332            self.inner.size_in_bytes() >= threshold
1333        {
1334            tracing::debug!(
1335                target: "providers::rocksdb",
1336                batch_size = self.inner.size_in_bytes(),
1337                threshold,
1338                "Auto-committing RocksDB batch"
1339            );
1340            let old_batch = std::mem::take(&mut self.inner);
1341            self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
1342                |e| {
1343                    ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1344                        message: e.to_string().into(),
1345                        code: -1,
1346                    }))
1347                },
1348            )?;
1349        }
1350        Ok(())
1351    }
1352
1353    /// Commits the batch to the database.
1354    ///
1355    /// This consumes the batch and writes all operations atomically to `RocksDB`.
1356    ///
1357    /// # Panics
1358    /// Panics if the provider is in read-only mode.
1359    #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(batch_len = self.inner.len(), batch_size = self.inner.size_in_bytes()))]
1360    pub fn commit(self) -> ProviderResult<()> {
1361        self.provider.0.db_rw().write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
1362            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1363                message: e.to_string().into(),
1364                code: -1,
1365            }))
1366        })
1367    }
1368
1369    /// Returns the number of write operations (puts + deletes) queued in this batch.
1370    pub fn len(&self) -> usize {
1371        self.inner.len()
1372    }
1373
1374    /// Returns `true` if the batch contains no operations.
1375    pub fn is_empty(&self) -> bool {
1376        self.inner.is_empty()
1377    }
1378
1379    /// Returns the size of the batch in bytes.
1380    pub fn size_in_bytes(&self) -> usize {
1381        self.inner.size_in_bytes()
1382    }
1383
1384    /// Returns a reference to the underlying `RocksDB` provider.
1385    pub const fn provider(&self) -> &RocksDBProvider {
1386        self.provider
1387    }
1388
1389    /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
1390    ///
1391    /// This is used to defer commits to the provider level.
1392    pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
1393        self.inner
1394    }
1395
1396    /// Gets a value from the database.
1397    ///
1398    /// **Important constraint:** This reads only committed state, not pending writes in this
1399    /// batch or other pending batches in `pending_rocksdb_batches`.
1400    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1401        self.provider.get::<T>(key)
1402    }
1403
1404    /// Appends indices to an account history shard with proper shard management.
1405    ///
1406    /// Loads the existing shard (if any), appends new indices, and rechunks into
1407    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1408    ///
1409    /// # Requirements
1410    ///
1411    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1412    /// - This method MUST only be called once per address per batch. The batch reads existing
1413    ///   shards from committed DB state, not from pending writes. Calling twice for the same
1414    ///   address will cause the second call to overwrite the first.
1415    pub fn append_account_history_shard(
1416        &mut self,
1417        address: Address,
1418        indices: impl IntoIterator<Item = u64>,
1419    ) -> ProviderResult<()> {
1420        let indices: Vec<u64> = indices.into_iter().collect();
1421
1422        if indices.is_empty() {
1423            return Ok(());
1424        }
1425
1426        debug_assert!(
1427            indices.windows(2).all(|w| w[0] < w[1]),
1428            "indices must be strictly increasing: {:?}",
1429            indices
1430        );
1431
1432        let last_key = ShardedKey::new(address, u64::MAX);
1433        let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
1434        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1435
1436        last_shard.append(indices).map_err(ProviderError::other)?;
1437
1438        // Fast path: all indices fit in one shard
1439        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1440            self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
1441            return Ok(());
1442        }
1443
1444        // Slow path: rechunk into multiple shards
1445        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1446        let mut chunks_peekable = chunks.into_iter().peekable();
1447
1448        while let Some(chunk) = chunks_peekable.next() {
1449            let shard = BlockNumberList::new_pre_sorted(chunk);
1450            let highest_block_number = if chunks_peekable.peek().is_some() {
1451                shard.iter().next_back().expect("`chunks` does not return empty list")
1452            } else {
1453                u64::MAX
1454            };
1455
1456            self.put::<tables::AccountsHistory>(
1457                ShardedKey::new(address, highest_block_number),
1458                &shard,
1459            )?;
1460        }
1461
1462        Ok(())
1463    }
1464
1465    /// Appends indices to a storage history shard with proper shard management.
1466    ///
1467    /// Loads the existing shard (if any), appends new indices, and rechunks into
1468    /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
1469    ///
1470    /// # Requirements
1471    ///
1472    /// - The `indices` MUST be strictly increasing and contain no duplicates.
1473    /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
1474    ///   batch reads existing shards from committed DB state, not from pending writes. Calling
1475    ///   twice for the same key will cause the second call to overwrite the first.
1476    pub fn append_storage_history_shard(
1477        &mut self,
1478        address: Address,
1479        storage_key: B256,
1480        indices: impl IntoIterator<Item = u64>,
1481    ) -> ProviderResult<()> {
1482        let indices: Vec<u64> = indices.into_iter().collect();
1483
1484        if indices.is_empty() {
1485            return Ok(());
1486        }
1487
1488        debug_assert!(
1489            indices.windows(2).all(|w| w[0] < w[1]),
1490            "indices must be strictly increasing: {:?}",
1491            indices
1492        );
1493
1494        let last_key = StorageShardedKey::last(address, storage_key);
1495        let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
1496        let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
1497
1498        last_shard.append(indices).map_err(ProviderError::other)?;
1499
1500        // Fast path: all indices fit in one shard
1501        if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
1502            self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
1503            return Ok(());
1504        }
1505
1506        // Slow path: rechunk into multiple shards
1507        let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
1508        let mut chunks_peekable = chunks.into_iter().peekable();
1509
1510        while let Some(chunk) = chunks_peekable.next() {
1511            let shard = BlockNumberList::new_pre_sorted(chunk);
1512            let highest_block_number = if chunks_peekable.peek().is_some() {
1513                shard.iter().next_back().expect("`chunks` does not return empty list")
1514            } else {
1515                u64::MAX
1516            };
1517
1518            self.put::<tables::StoragesHistory>(
1519                StorageShardedKey::new(address, storage_key, highest_block_number),
1520                &shard,
1521            )?;
1522        }
1523
1524        Ok(())
1525    }
1526
1527    /// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
1528    ///
1529    /// Mirrors MDBX `unwind_history_shards` behavior:
1530    /// - Deletes shards entirely above `keep_to`
1531    /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
1532    /// - Preserves shards entirely below `keep_to`
1533    pub fn unwind_account_history_to(
1534        &mut self,
1535        address: Address,
1536        keep_to: BlockNumber,
1537    ) -> ProviderResult<()> {
1538        let shards = self.provider.account_history_shards(address)?;
1539        if shards.is_empty() {
1540            return Ok(());
1541        }
1542
1543        // Find the first shard that might contain blocks > keep_to.
1544        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1545        let boundary_idx = shards.iter().position(|(key, _)| {
1546            key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
1547        });
1548
1549        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1550        let Some(boundary_idx) = boundary_idx else {
1551            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1552            if last_key.highest_block_number != u64::MAX {
1553                self.delete::<tables::AccountsHistory>(last_key.clone())?;
1554                self.put::<tables::AccountsHistory>(
1555                    ShardedKey::new(address, u64::MAX),
1556                    last_value,
1557                )?;
1558            }
1559            return Ok(());
1560        };
1561
1562        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1563        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1564            self.delete::<tables::AccountsHistory>(key.clone())?;
1565        }
1566
1567        // Process the boundary shard: filter out blocks > keep_to
1568        let (boundary_key, boundary_list) = &shards[boundary_idx];
1569
1570        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1571        self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
1572
1573        // Build truncated list once; check emptiness directly (avoids double iteration)
1574        let new_last =
1575            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1576
1577        if new_last.is_empty() {
1578            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
1579            // u64::MAX.
1580            if boundary_idx == 0 {
1581                // Nothing left for this address
1582                return Ok(());
1583            }
1584
1585            let (prev_key, prev_value) = &shards[boundary_idx - 1];
1586            if prev_key.highest_block_number != u64::MAX {
1587                self.delete::<tables::AccountsHistory>(prev_key.clone())?;
1588                self.put::<tables::AccountsHistory>(
1589                    ShardedKey::new(address, u64::MAX),
1590                    prev_value,
1591                )?;
1592            }
1593            return Ok(());
1594        }
1595
1596        self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
1597
1598        Ok(())
1599    }
1600
1601    /// Unwinds storage history to keep only blocks `<= keep_to`.
1602    ///
1603    /// Handles multi-shard scenarios by:
1604    /// 1. Loading all shards for the `(address, storage_key)` pair
1605    /// 2. Finding the boundary shard containing `keep_to`
1606    /// 3. Deleting all shards after the boundary
1607    /// 4. Truncating the boundary shard to keep only indices `<= keep_to`
1608    /// 5. Ensuring the last shard is keyed with `u64::MAX`
1609    pub fn unwind_storage_history_to(
1610        &mut self,
1611        address: Address,
1612        storage_key: B256,
1613        keep_to: BlockNumber,
1614    ) -> ProviderResult<()> {
1615        let shards = self.provider.storage_history_shards(address, storage_key)?;
1616        if shards.is_empty() {
1617            return Ok(());
1618        }
1619
1620        // Find the first shard that might contain blocks > keep_to.
1621        // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
1622        let boundary_idx = shards.iter().position(|(key, _)| {
1623            key.sharded_key.highest_block_number == u64::MAX ||
1624                key.sharded_key.highest_block_number > keep_to
1625        });
1626
1627        // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
1628        let Some(boundary_idx) = boundary_idx else {
1629            let (last_key, last_value) = shards.last().expect("shards is non-empty");
1630            if last_key.sharded_key.highest_block_number != u64::MAX {
1631                self.delete::<tables::StoragesHistory>(last_key.clone())?;
1632                self.put::<tables::StoragesHistory>(
1633                    StorageShardedKey::last(address, storage_key),
1634                    last_value,
1635                )?;
1636            }
1637            return Ok(());
1638        };
1639
1640        // Delete all shards strictly after the boundary (they are entirely > keep_to)
1641        for (key, _) in shards.iter().skip(boundary_idx + 1) {
1642            self.delete::<tables::StoragesHistory>(key.clone())?;
1643        }
1644
1645        // Process the boundary shard: filter out blocks > keep_to
1646        let (boundary_key, boundary_list) = &shards[boundary_idx];
1647
1648        // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
1649        self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
1650
1651        // Build truncated list once; check emptiness directly (avoids double iteration)
1652        let new_last =
1653            BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
1654
1655        if new_last.is_empty() {
1656            // Boundary shard is now empty. Previous shard becomes the last and must be keyed
1657            // u64::MAX.
1658            if boundary_idx == 0 {
1659                // Nothing left for this (address, storage_key) pair
1660                return Ok(());
1661            }
1662
1663            let (prev_key, prev_value) = &shards[boundary_idx - 1];
1664            if prev_key.sharded_key.highest_block_number != u64::MAX {
1665                self.delete::<tables::StoragesHistory>(prev_key.clone())?;
1666                self.put::<tables::StoragesHistory>(
1667                    StorageShardedKey::last(address, storage_key),
1668                    prev_value,
1669                )?;
1670            }
1671            return Ok(());
1672        }
1673
1674        self.put::<tables::StoragesHistory>(
1675            StorageShardedKey::last(address, storage_key),
1676            &new_last,
1677        )?;
1678
1679        Ok(())
1680    }
1681
1682    /// Clears all account history shards for the given address.
1683    ///
1684    /// Used when unwinding from block 0 (i.e., removing all history).
1685    pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
1686        let shards = self.provider.account_history_shards(address)?;
1687        for (key, _) in shards {
1688            self.delete::<tables::AccountsHistory>(key)?;
1689        }
1690        Ok(())
1691    }
1692
1693    /// Clears all storage history shards for the given `(address, storage_key)` pair.
1694    ///
1695    /// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
1696    pub fn clear_storage_history(
1697        &mut self,
1698        address: Address,
1699        storage_key: B256,
1700    ) -> ProviderResult<()> {
1701        let shards = self.provider.storage_history_shards(address, storage_key)?;
1702        for (key, _) in shards {
1703            self.delete::<tables::StoragesHistory>(key)?;
1704        }
1705        Ok(())
1706    }
1707}
1708
1709/// `RocksDB` transaction wrapper providing MDBX-like semantics.
1710///
1711/// Supports:
1712/// - Read-your-writes: reads see uncommitted writes within the same transaction
1713/// - Atomic commit/rollback
1714/// - Iteration over uncommitted data
1715///
1716/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
1717/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
1718pub struct RocksTx<'db> {
1719    inner: Transaction<'db, OptimisticTransactionDB>,
1720    provider: &'db RocksDBProvider,
1721}
1722
1723impl fmt::Debug for RocksTx<'_> {
1724    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1725        f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
1726    }
1727}
1728
1729impl<'db> RocksTx<'db> {
1730    /// Gets a value from the specified table. Sees uncommitted writes in this transaction.
1731    pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
1732        let encoded_key = key.encode();
1733        self.get_encoded::<T>(&encoded_key)
1734    }
1735
1736    /// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
1737    pub fn get_encoded<T: Table>(
1738        &self,
1739        key: &<T::Key as Encode>::Encoded,
1740    ) -> ProviderResult<Option<T::Value>> {
1741        let cf = self.provider.get_cf_handle::<T>()?;
1742        let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
1743            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1744                message: e.to_string().into(),
1745                code: -1,
1746            }))
1747        })?;
1748
1749        Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
1750    }
1751
1752    /// Puts a value into the specified table.
1753    pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
1754        let encoded_key = key.encode();
1755        self.put_encoded::<T>(&encoded_key, value)
1756    }
1757
1758    /// Puts a value using pre-encoded key.
1759    pub fn put_encoded<T: Table>(
1760        &self,
1761        key: &<T::Key as Encode>::Encoded,
1762        value: &T::Value,
1763    ) -> ProviderResult<()> {
1764        let cf = self.provider.get_cf_handle::<T>()?;
1765        let mut buf = Vec::new();
1766        let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
1767
1768        self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
1769            ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
1770                info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
1771                operation: DatabaseWriteOperation::PutUpsert,
1772                table_name: T::NAME,
1773                key: key.as_ref().to_vec(),
1774            })))
1775        })
1776    }
1777
1778    /// Deletes a value from the specified table.
1779    pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
1780        let cf = self.provider.get_cf_handle::<T>()?;
1781        self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
1782            ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
1783                message: e.to_string().into(),
1784                code: -1,
1785            }))
1786        })
1787    }
1788
1789    /// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
1790    ///
1791    /// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
1792    pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
1793        let cf = self.provider.get_cf_handle::<T>()?;
1794        let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
1795        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
1796    }
1797
1798    /// Creates an iterator starting from the given key (inclusive).
1799    pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
1800        let cf = self.provider.get_cf_handle::<T>()?;
1801        let encoded_key = key.encode();
1802        let iter = self
1803            .inner
1804            .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
1805        Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
1806    }
1807
1808    /// Commits the transaction, persisting all changes.
1809    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1810    pub fn commit(self) -> ProviderResult<()> {
1811        self.inner.commit().map_err(|e| {
1812            ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
1813                message: e.to_string().into(),
1814                code: -1,
1815            }))
1816        })
1817    }
1818
1819    /// Rolls back the transaction, discarding all changes.
1820    #[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
1821    pub fn rollback(self) -> ProviderResult<()> {
1822        self.inner.rollback().map_err(|e| {
1823            ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
1824        })
1825    }
1826
1827    /// Lookup account history and return [`HistoryInfo`] directly.
1828    ///
1829    /// This is a thin wrapper around `history_info` that:
1830    /// - Builds the `ShardedKey` for the address + target block.
1831    /// - Validates that the found shard belongs to the same address.
1832    pub fn account_history_info(
1833        &self,
1834        address: Address,
1835        block_number: BlockNumber,
1836        lowest_available_block_number: Option<BlockNumber>,
1837    ) -> ProviderResult<HistoryInfo> {
1838        let key = ShardedKey::new(address, block_number);
1839        self.history_info::<tables::AccountsHistory>(
1840            key.encode().as_ref(),
1841            block_number,
1842            lowest_available_block_number,
1843            |key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
1844            |prev_bytes| {
1845                <ShardedKey<Address> as Decode>::decode(prev_bytes)
1846                    .map(|k| k.key == address)
1847                    .unwrap_or(false)
1848            },
1849        )
1850    }
1851
1852    /// Lookup storage history and return [`HistoryInfo`] directly.
1853    ///
1854    /// This is a thin wrapper around `history_info` that:
1855    /// - Builds the `StorageShardedKey` for address + storage key + target block.
1856    /// - Validates that the found shard belongs to the same address and storage slot.
1857    pub fn storage_history_info(
1858        &self,
1859        address: Address,
1860        storage_key: B256,
1861        block_number: BlockNumber,
1862        lowest_available_block_number: Option<BlockNumber>,
1863    ) -> ProviderResult<HistoryInfo> {
1864        let key = StorageShardedKey::new(address, storage_key, block_number);
1865        self.history_info::<tables::StoragesHistory>(
1866            key.encode().as_ref(),
1867            block_number,
1868            lowest_available_block_number,
1869            |key_bytes| {
1870                let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
1871                Ok(k.address == address && k.sharded_key.key == storage_key)
1872            },
1873            |prev_bytes| {
1874                <StorageShardedKey as Decode>::decode(prev_bytes)
1875                    .map(|k| k.address == address && k.sharded_key.key == storage_key)
1876                    .unwrap_or(false)
1877            },
1878        )
1879    }
1880
1881    /// Generic history lookup for sharded history tables.
1882    ///
1883    /// Seeks to the shard containing `block_number`, checks if the key matches via `key_matches`,
1884    /// and uses `prev_key_matches` to detect if a previous shard exists for the same key.
1885    fn history_info<T>(
1886        &self,
1887        encoded_key: &[u8],
1888        block_number: BlockNumber,
1889        lowest_available_block_number: Option<BlockNumber>,
1890        key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
1891        prev_key_matches: impl Fn(&[u8]) -> bool,
1892    ) -> ProviderResult<HistoryInfo>
1893    where
1894        T: Table<Value = BlockNumberList>,
1895    {
1896        // History may be pruned if a lowest available block is set.
1897        let is_maybe_pruned = lowest_available_block_number.is_some();
1898        let fallback = || {
1899            Ok(if is_maybe_pruned {
1900                HistoryInfo::MaybeInPlainState
1901            } else {
1902                HistoryInfo::NotYetWritten
1903            })
1904        };
1905
1906        let cf = self.provider.0.cf_handle_rw(T::NAME)?;
1907
1908        // Create a raw iterator to access key bytes directly.
1909        let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
1910            self.inner.raw_iterator_cf(&cf);
1911
1912        // Seek to the smallest key >= encoded_key.
1913        iter.seek(encoded_key);
1914        Self::raw_iter_status_ok(&iter)?;
1915
1916        if !iter.valid() {
1917            // No shard found at or after target block.
1918            //
1919            // (MaybeInPlainState) The key may have been written, but due to pruning we may not have
1920            // changesets and history, so we need to make a plain state lookup.
1921            // (HistoryInfo::NotYetWritten) The key has not been written to at all.
1922            return fallback();
1923        }
1924
1925        // Check if the found key matches our target entity.
1926        let Some(key_bytes) = iter.key() else {
1927            return fallback();
1928        };
1929        if !key_matches(key_bytes)? {
1930            // The found key is for a different entity.
1931            return fallback();
1932        }
1933
1934        // Decompress the block list for this shard.
1935        let Some(value_bytes) = iter.value() else {
1936            return fallback();
1937        };
1938        let chunk = BlockNumberList::decompress(value_bytes)?;
1939        let (rank, found_block) = compute_history_rank(&chunk, block_number);
1940
1941        // Lazy check for previous shard - only called when needed.
1942        // If we can step to a previous shard for this same key, history already exists,
1943        // so the target block is not before the first write.
1944        let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
1945            iter.prev();
1946            Self::raw_iter_status_ok(&iter)?;
1947            let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
1948            !has_prev
1949        } else {
1950            false
1951        };
1952
1953        Ok(HistoryInfo::from_lookup(
1954            found_block,
1955            is_before_first_write,
1956            lowest_available_block_number,
1957        ))
1958    }
1959
1960    /// Returns an error if the raw iterator is in an invalid state due to an I/O error.
1961    fn raw_iter_status_ok(
1962        iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
1963    ) -> ProviderResult<()> {
1964        iter.status().map_err(|e| {
1965            ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
1966                message: e.to_string().into(),
1967                code: -1,
1968            }))
1969        })
1970    }
1971}
1972
1973/// Wrapper enum for `RocksDB` iterators that works in both read-write and read-only modes.
1974enum RocksDBIterEnum<'db> {
1975    /// Iterator from read-write `OptimisticTransactionDB`.
1976    ReadWrite(rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>),
1977    /// Iterator from read-only `DB`.
1978    ReadOnly(rocksdb::DBIteratorWithThreadMode<'db, DB>),
1979}
1980
1981impl Iterator for RocksDBIterEnum<'_> {
1982    type Item = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>;
1983
1984    fn next(&mut self) -> Option<Self::Item> {
1985        match self {
1986            Self::ReadWrite(iter) => iter.next(),
1987            Self::ReadOnly(iter) => iter.next(),
1988        }
1989    }
1990}
1991
1992/// Iterator over a `RocksDB` table (non-transactional).
1993///
1994/// Yields decoded `(Key, Value)` pairs in key order.
1995pub struct RocksDBIter<'db, T: Table> {
1996    inner: RocksDBIterEnum<'db>,
1997    _marker: std::marker::PhantomData<T>,
1998}
1999
2000impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
2001    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2002        f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
2003    }
2004}
2005
2006impl<T: Table> Iterator for RocksDBIter<'_, T> {
2007    type Item = ProviderResult<(T::Key, T::Value)>;
2008
2009    fn next(&mut self) -> Option<Self::Item> {
2010        let (key_bytes, value_bytes) = match self.inner.next()? {
2011            Ok(kv) => kv,
2012            Err(e) => {
2013                return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2014                    message: e.to_string().into(),
2015                    code: -1,
2016                }))))
2017            }
2018        };
2019
2020        // Decode key
2021        let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
2022            Ok(k) => k,
2023            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2024        };
2025
2026        // Decompress value
2027        let value = match T::Value::decompress(&value_bytes) {
2028            Ok(v) => v,
2029            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2030        };
2031
2032        Some(Ok((key, value)))
2033    }
2034}
2035
2036/// Raw iterator over a `RocksDB` table (non-transactional).
2037///
2038/// Yields raw `(key_bytes, value_bytes)` pairs without decoding.
2039pub struct RocksDBRawIter<'db> {
2040    inner: RocksDBIterEnum<'db>,
2041}
2042
2043impl fmt::Debug for RocksDBRawIter<'_> {
2044    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2045        f.debug_struct("RocksDBRawIter").finish_non_exhaustive()
2046    }
2047}
2048
2049impl Iterator for RocksDBRawIter<'_> {
2050    type Item = ProviderResult<(Box<[u8]>, Box<[u8]>)>;
2051
2052    fn next(&mut self) -> Option<Self::Item> {
2053        match self.inner.next()? {
2054            Ok(kv) => Some(Ok(kv)),
2055            Err(e) => Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2056                message: e.to_string().into(),
2057                code: -1,
2058            })))),
2059        }
2060    }
2061}
2062
2063/// Iterator over a `RocksDB` table within a transaction.
2064///
2065/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
2066pub struct RocksTxIter<'tx, T: Table> {
2067    inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
2068    _marker: std::marker::PhantomData<T>,
2069}
2070
2071impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
2072    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2073        f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
2074    }
2075}
2076
2077impl<T: Table> Iterator for RocksTxIter<'_, T> {
2078    type Item = ProviderResult<(T::Key, T::Value)>;
2079
2080    fn next(&mut self) -> Option<Self::Item> {
2081        let (key_bytes, value_bytes) = match self.inner.next()? {
2082            Ok(kv) => kv,
2083            Err(e) => {
2084                return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
2085                    message: e.to_string().into(),
2086                    code: -1,
2087                }))))
2088            }
2089        };
2090
2091        // Decode key
2092        let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
2093            Ok(k) => k,
2094            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2095        };
2096
2097        // Decompress value
2098        let value = match T::Value::decompress(&value_bytes) {
2099            Ok(v) => v,
2100            Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
2101        };
2102
2103        Some(Ok((key, value)))
2104    }
2105}
2106
2107/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
2108const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
2109    match level {
2110        LogLevel::Fatal => rocksdb::LogLevel::Fatal,
2111        LogLevel::Error => rocksdb::LogLevel::Error,
2112        LogLevel::Warn => rocksdb::LogLevel::Warn,
2113        LogLevel::Notice | LogLevel::Verbose => rocksdb::LogLevel::Info,
2114        LogLevel::Debug | LogLevel::Trace | LogLevel::Extra => rocksdb::LogLevel::Debug,
2115    }
2116}
2117
2118#[cfg(test)]
2119mod tests {
2120    use super::*;
2121    use crate::providers::HistoryInfo;
2122    use alloy_primitives::{Address, TxHash, B256};
2123    use reth_db_api::{
2124        models::{
2125            sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
2126            storage_sharded_key::StorageShardedKey,
2127            IntegerList,
2128        },
2129        table::Table,
2130        tables,
2131    };
2132    use tempfile::TempDir;
2133
2134    #[test]
2135    fn test_with_default_tables_registers_required_column_families() {
2136        let temp_dir = TempDir::new().unwrap();
2137
2138        // Build with default tables
2139        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2140
2141        // Should be able to write/read TransactionHashNumbers
2142        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2143        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2144        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2145
2146        // Should be able to write/read AccountsHistory
2147        let key = ShardedKey::new(Address::ZERO, 100);
2148        let value = IntegerList::default();
2149        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
2150        assert!(provider.get::<tables::AccountsHistory>(key).unwrap().is_some());
2151
2152        // Should be able to write/read StoragesHistory
2153        let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 100);
2154        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
2155        assert!(provider.get::<tables::StoragesHistory>(key).unwrap().is_some());
2156    }
2157
2158    #[derive(Debug)]
2159    struct TestTable;
2160
2161    impl Table for TestTable {
2162        const NAME: &'static str = "TestTable";
2163        const DUPSORT: bool = false;
2164        type Key = u64;
2165        type Value = Vec<u8>;
2166    }
2167
2168    #[test]
2169    fn test_basic_operations() {
2170        let temp_dir = TempDir::new().unwrap();
2171
2172        let provider = RocksDBBuilder::new(temp_dir.path())
2173            .with_table::<TestTable>() // Type-safe!
2174            .build()
2175            .unwrap();
2176
2177        let key = 42u64;
2178        let value = b"test_value".to_vec();
2179
2180        // Test write
2181        provider.put::<TestTable>(key, &value).unwrap();
2182
2183        // Test read
2184        let result = provider.get::<TestTable>(key).unwrap();
2185        assert_eq!(result, Some(value));
2186
2187        // Test delete
2188        provider.delete::<TestTable>(key).unwrap();
2189
2190        // Verify deletion
2191        assert_eq!(provider.get::<TestTable>(key).unwrap(), None);
2192    }
2193
2194    #[test]
2195    fn test_batch_operations() {
2196        let temp_dir = TempDir::new().unwrap();
2197        let provider =
2198            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2199
2200        // Write multiple entries in a batch
2201        provider
2202            .write_batch(|batch| {
2203                for i in 0..10u64 {
2204                    let value = format!("value_{i}").into_bytes();
2205                    batch.put::<TestTable>(i, &value)?;
2206                }
2207                Ok(())
2208            })
2209            .unwrap();
2210
2211        // Read all entries
2212        for i in 0..10u64 {
2213            let value = format!("value_{i}").into_bytes();
2214            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2215        }
2216
2217        // Delete all entries in a batch
2218        provider
2219            .write_batch(|batch| {
2220                for i in 0..10u64 {
2221                    batch.delete::<TestTable>(i)?;
2222                }
2223                Ok(())
2224            })
2225            .unwrap();
2226
2227        // Verify all deleted
2228        for i in 0..10u64 {
2229            assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
2230        }
2231    }
2232
2233    #[test]
2234    fn test_with_real_table() {
2235        let temp_dir = TempDir::new().unwrap();
2236        let provider = RocksDBBuilder::new(temp_dir.path())
2237            .with_table::<tables::TransactionHashNumbers>()
2238            .with_metrics()
2239            .build()
2240            .unwrap();
2241
2242        let tx_hash = TxHash::from(B256::from([1u8; 32]));
2243
2244        // Insert and retrieve
2245        provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
2246        assert_eq!(provider.get::<tables::TransactionHashNumbers>(tx_hash).unwrap(), Some(100));
2247
2248        // Batch insert multiple transactions
2249        provider
2250            .write_batch(|batch| {
2251                for i in 0..10u64 {
2252                    let hash = TxHash::from(B256::from([i as u8; 32]));
2253                    let value = i * 100;
2254                    batch.put::<tables::TransactionHashNumbers>(hash, &value)?;
2255                }
2256                Ok(())
2257            })
2258            .unwrap();
2259
2260        // Verify batch insertions
2261        for i in 0..10u64 {
2262            let hash = TxHash::from(B256::from([i as u8; 32]));
2263            assert_eq!(
2264                provider.get::<tables::TransactionHashNumbers>(hash).unwrap(),
2265                Some(i * 100)
2266            );
2267        }
2268    }
2269    #[test]
2270    fn test_statistics_enabled() {
2271        let temp_dir = TempDir::new().unwrap();
2272        // Just verify that building with statistics doesn't panic
2273        let provider = RocksDBBuilder::new(temp_dir.path())
2274            .with_table::<TestTable>()
2275            .with_statistics()
2276            .build()
2277            .unwrap();
2278
2279        // Do operations - data should be immediately readable with OptimisticTransactionDB
2280        for i in 0..10 {
2281            let value = vec![i as u8];
2282            provider.put::<TestTable>(i, &value).unwrap();
2283            // Verify write is visible
2284            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2285        }
2286    }
2287
2288    #[test]
2289    fn test_data_persistence() {
2290        let temp_dir = TempDir::new().unwrap();
2291        let provider =
2292            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2293
2294        // Insert data - OptimisticTransactionDB writes are immediately visible
2295        let value = vec![42u8; 1000];
2296        for i in 0..100 {
2297            provider.put::<TestTable>(i, &value).unwrap();
2298        }
2299
2300        // Verify data is readable
2301        for i in 0..100 {
2302            assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
2303        }
2304    }
2305
2306    #[test]
2307    fn test_transaction_read_your_writes() {
2308        let temp_dir = TempDir::new().unwrap();
2309        let provider =
2310            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2311
2312        // Create a transaction
2313        let tx = provider.tx();
2314
2315        // Write data within the transaction
2316        let key = 42u64;
2317        let value = b"test_value".to_vec();
2318        tx.put::<TestTable>(key, &value).unwrap();
2319
2320        // Read-your-writes: should see uncommitted data in same transaction
2321        let result = tx.get::<TestTable>(key).unwrap();
2322        assert_eq!(
2323            result,
2324            Some(value.clone()),
2325            "Transaction should see its own uncommitted writes"
2326        );
2327
2328        // Data should NOT be visible via provider (outside transaction)
2329        let provider_result = provider.get::<TestTable>(key).unwrap();
2330        assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
2331
2332        // Commit the transaction
2333        tx.commit().unwrap();
2334
2335        // Now data should be visible via provider
2336        let committed_result = provider.get::<TestTable>(key).unwrap();
2337        assert_eq!(committed_result, Some(value), "Committed data should be visible");
2338    }
2339
2340    #[test]
2341    fn test_transaction_rollback() {
2342        let temp_dir = TempDir::new().unwrap();
2343        let provider =
2344            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2345
2346        // First, put some initial data
2347        let key = 100u64;
2348        let initial_value = b"initial".to_vec();
2349        provider.put::<TestTable>(key, &initial_value).unwrap();
2350
2351        // Create a transaction and modify data
2352        let tx = provider.tx();
2353        let new_value = b"modified".to_vec();
2354        tx.put::<TestTable>(key, &new_value).unwrap();
2355
2356        // Verify modification is visible within transaction
2357        assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
2358
2359        // Rollback instead of commit
2360        tx.rollback().unwrap();
2361
2362        // Data should be unchanged (initial value)
2363        let result = provider.get::<TestTable>(key).unwrap();
2364        assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
2365    }
2366
2367    #[test]
2368    fn test_transaction_iterator() {
2369        let temp_dir = TempDir::new().unwrap();
2370        let provider =
2371            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2372
2373        // Create a transaction
2374        let tx = provider.tx();
2375
2376        // Write multiple entries
2377        for i in 0..5u64 {
2378            let value = format!("value_{i}").into_bytes();
2379            tx.put::<TestTable>(i, &value).unwrap();
2380        }
2381
2382        // Iterate - should see uncommitted writes
2383        let mut count = 0;
2384        for result in tx.iter::<TestTable>().unwrap() {
2385            let (key, value) = result.unwrap();
2386            assert_eq!(value, format!("value_{key}").into_bytes());
2387            count += 1;
2388        }
2389        assert_eq!(count, 5, "Iterator should see all uncommitted writes");
2390
2391        // Commit
2392        tx.commit().unwrap();
2393    }
2394
2395    #[test]
2396    fn test_batch_manual_commit() {
2397        let temp_dir = TempDir::new().unwrap();
2398        let provider =
2399            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2400
2401        // Create a batch via provider.batch()
2402        let mut batch = provider.batch();
2403
2404        // Add entries
2405        for i in 0..10u64 {
2406            let value = format!("batch_value_{i}").into_bytes();
2407            batch.put::<TestTable>(i, &value).unwrap();
2408        }
2409
2410        // Verify len/is_empty
2411        assert_eq!(batch.len(), 10);
2412        assert!(!batch.is_empty());
2413
2414        // Data should NOT be visible before commit
2415        assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
2416
2417        // Commit the batch
2418        batch.commit().unwrap();
2419
2420        // Now data should be visible
2421        for i in 0..10u64 {
2422            let value = format!("batch_value_{i}").into_bytes();
2423            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2424        }
2425    }
2426
2427    #[test]
2428    fn test_first_and_last_entry() {
2429        let temp_dir = TempDir::new().unwrap();
2430        let provider =
2431            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2432
2433        // Empty table should return None for both
2434        assert_eq!(provider.first::<TestTable>().unwrap(), None);
2435        assert_eq!(provider.last::<TestTable>().unwrap(), None);
2436
2437        // Insert some entries
2438        provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
2439        provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
2440        provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
2441
2442        // First should return the smallest key
2443        let first = provider.first::<TestTable>().unwrap();
2444        assert_eq!(first, Some((5, b"value_5".to_vec())));
2445
2446        // Last should return the largest key
2447        let last = provider.last::<TestTable>().unwrap();
2448        assert_eq!(last, Some((20, b"value_20".to_vec())));
2449    }
2450
2451    /// Tests the edge case where block < `lowest_available_block_number`.
2452    /// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
2453    /// so we keep this RocksDB-specific test to verify the low-level behavior.
2454    #[test]
2455    fn test_account_history_info_pruned_before_first_entry() {
2456        let temp_dir = TempDir::new().unwrap();
2457        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2458
2459        let address = Address::from([0x42; 20]);
2460
2461        // Create a single shard starting at block 100
2462        let chunk = IntegerList::new([100, 200, 300]).unwrap();
2463        let shard_key = ShardedKey::new(address, u64::MAX);
2464        provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
2465
2466        let tx = provider.tx();
2467
2468        // Query for block 50 with lowest_available_block_number = 100
2469        // This simulates a pruned state where data before block 100 is not available.
2470        // Since we're before the first write AND pruning boundary is set, we need to
2471        // check the changeset at the first write block.
2472        let result = tx.account_history_info(address, 50, Some(100)).unwrap();
2473        assert_eq!(result, HistoryInfo::InChangeset(100));
2474
2475        tx.rollback().unwrap();
2476    }
2477
2478    #[test]
2479    fn test_account_history_shard_split_at_boundary() {
2480        let temp_dir = TempDir::new().unwrap();
2481        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2482
2483        let address = Address::from([0x42; 20]);
2484        let limit = NUM_OF_INDICES_IN_SHARD;
2485
2486        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
2487        let indices: Vec<u64> = (0..=(limit as u64)).collect();
2488        let mut batch = provider.batch();
2489        batch.append_account_history_shard(address, indices).unwrap();
2490        batch.commit().unwrap();
2491
2492        // Should have 2 shards: one completed shard and one sentinel shard
2493        let completed_key = ShardedKey::new(address, (limit - 1) as u64);
2494        let sentinel_key = ShardedKey::new(address, u64::MAX);
2495
2496        let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
2497        let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
2498
2499        assert!(completed_shard.is_some(), "completed shard should exist");
2500        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2501
2502        let completed_shard = completed_shard.unwrap();
2503        let sentinel_shard = sentinel_shard.unwrap();
2504
2505        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2506        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2507    }
2508
2509    #[test]
2510    fn test_account_history_multiple_shard_splits() {
2511        let temp_dir = TempDir::new().unwrap();
2512        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2513
2514        let address = Address::from([0x43; 20]);
2515        let limit = NUM_OF_INDICES_IN_SHARD;
2516
2517        // First batch: add NUM_OF_INDICES_IN_SHARD indices
2518        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
2519        let mut batch = provider.batch();
2520        batch.append_account_history_shard(address, first_batch_indices).unwrap();
2521        batch.commit().unwrap();
2522
2523        // Should have just a sentinel shard (exactly at limit, not over)
2524        let sentinel_key = ShardedKey::new(address, u64::MAX);
2525        let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
2526        assert!(shard.is_some());
2527        assert_eq!(shard.unwrap().len(), limit as u64);
2528
2529        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
2530        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
2531        let mut batch = provider.batch();
2532        batch.append_account_history_shard(address, second_batch_indices).unwrap();
2533        batch.commit().unwrap();
2534
2535        // Now we should have: 2 completed shards + 1 sentinel shard
2536        let first_completed = ShardedKey::new(address, (limit - 1) as u64);
2537        let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
2538
2539        assert!(
2540            provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
2541            "first completed shard should exist"
2542        );
2543        assert!(
2544            provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
2545            "second completed shard should exist"
2546        );
2547        assert!(
2548            provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
2549            "sentinel shard should exist"
2550        );
2551    }
2552
2553    #[test]
2554    fn test_storage_history_shard_split_at_boundary() {
2555        let temp_dir = TempDir::new().unwrap();
2556        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2557
2558        let address = Address::from([0x44; 20]);
2559        let slot = B256::from([0x55; 32]);
2560        let limit = NUM_OF_INDICES_IN_SHARD;
2561
2562        // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
2563        let indices: Vec<u64> = (0..=(limit as u64)).collect();
2564        let mut batch = provider.batch();
2565        batch.append_storage_history_shard(address, slot, indices).unwrap();
2566        batch.commit().unwrap();
2567
2568        // Should have 2 shards: one completed shard and one sentinel shard
2569        let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
2570        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
2571
2572        let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
2573        let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
2574
2575        assert!(completed_shard.is_some(), "completed shard should exist");
2576        assert!(sentinel_shard.is_some(), "sentinel shard should exist");
2577
2578        let completed_shard = completed_shard.unwrap();
2579        let sentinel_shard = sentinel_shard.unwrap();
2580
2581        assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
2582        assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
2583    }
2584
2585    #[test]
2586    fn test_storage_history_multiple_shard_splits() {
2587        let temp_dir = TempDir::new().unwrap();
2588        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2589
2590        let address = Address::from([0x46; 20]);
2591        let slot = B256::from([0x57; 32]);
2592        let limit = NUM_OF_INDICES_IN_SHARD;
2593
2594        // First batch: add NUM_OF_INDICES_IN_SHARD indices
2595        let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
2596        let mut batch = provider.batch();
2597        batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
2598        batch.commit().unwrap();
2599
2600        // Should have just a sentinel shard (exactly at limit, not over)
2601        let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
2602        let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
2603        assert!(shard.is_some());
2604        assert_eq!(shard.unwrap().len(), limit as u64);
2605
2606        // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
2607        let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
2608        let mut batch = provider.batch();
2609        batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
2610        batch.commit().unwrap();
2611
2612        // Now we should have: 2 completed shards + 1 sentinel shard
2613        let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
2614        let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
2615
2616        assert!(
2617            provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
2618            "first completed shard should exist"
2619        );
2620        assert!(
2621            provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
2622            "second completed shard should exist"
2623        );
2624        assert!(
2625            provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
2626            "sentinel shard should exist"
2627        );
2628    }
2629
2630    #[test]
2631    fn test_clear_table() {
2632        let temp_dir = TempDir::new().unwrap();
2633        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2634
2635        let address = Address::from([0x42; 20]);
2636        let key = ShardedKey::new(address, u64::MAX);
2637        let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
2638
2639        provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
2640        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
2641
2642        provider.clear::<tables::AccountsHistory>().unwrap();
2643
2644        assert!(
2645            provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
2646            "table should be empty after clear"
2647        );
2648        assert!(
2649            provider.first::<tables::AccountsHistory>().unwrap().is_none(),
2650            "first() should return None after clear"
2651        );
2652    }
2653
2654    #[test]
2655    fn test_clear_empty_table() {
2656        let temp_dir = TempDir::new().unwrap();
2657        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2658
2659        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
2660
2661        provider.clear::<tables::AccountsHistory>().unwrap();
2662
2663        assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
2664    }
2665
2666    #[test]
2667    fn test_unwind_account_history_to_basic() {
2668        let temp_dir = TempDir::new().unwrap();
2669        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2670
2671        let address = Address::from([0x42; 20]);
2672
2673        // Add blocks 0-10
2674        let mut batch = provider.batch();
2675        batch.append_account_history_shard(address, 0..=10).unwrap();
2676        batch.commit().unwrap();
2677
2678        // Verify we have blocks 0-10
2679        let key = ShardedKey::new(address, u64::MAX);
2680        let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
2681        assert!(result.is_some());
2682        let blocks: Vec<u64> = result.unwrap().iter().collect();
2683        assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
2684
2685        // Unwind to block 5 (keep blocks 0-5, remove 6-10)
2686        let mut batch = provider.batch();
2687        batch.unwind_account_history_to(address, 5).unwrap();
2688        batch.commit().unwrap();
2689
2690        // Verify only blocks 0-5 remain
2691        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2692        assert!(result.is_some());
2693        let blocks: Vec<u64> = result.unwrap().iter().collect();
2694        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
2695    }
2696
2697    #[test]
2698    fn test_unwind_account_history_to_removes_all() {
2699        let temp_dir = TempDir::new().unwrap();
2700        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2701
2702        let address = Address::from([0x42; 20]);
2703
2704        // Add blocks 5-10
2705        let mut batch = provider.batch();
2706        batch.append_account_history_shard(address, 5..=10).unwrap();
2707        batch.commit().unwrap();
2708
2709        // Unwind to block 4 (removes all blocks since they're all > 4)
2710        let mut batch = provider.batch();
2711        batch.unwind_account_history_to(address, 4).unwrap();
2712        batch.commit().unwrap();
2713
2714        // Verify no data remains for this address
2715        let key = ShardedKey::new(address, u64::MAX);
2716        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2717        assert!(result.is_none(), "Should have no data after full unwind");
2718    }
2719
2720    #[test]
2721    fn test_unwind_account_history_to_no_op() {
2722        let temp_dir = TempDir::new().unwrap();
2723        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2724
2725        let address = Address::from([0x42; 20]);
2726
2727        // Add blocks 0-5
2728        let mut batch = provider.batch();
2729        batch.append_account_history_shard(address, 0..=5).unwrap();
2730        batch.commit().unwrap();
2731
2732        // Unwind to block 10 (no-op since all blocks are <= 10)
2733        let mut batch = provider.batch();
2734        batch.unwind_account_history_to(address, 10).unwrap();
2735        batch.commit().unwrap();
2736
2737        // Verify blocks 0-5 still remain
2738        let key = ShardedKey::new(address, u64::MAX);
2739        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2740        assert!(result.is_some());
2741        let blocks: Vec<u64> = result.unwrap().iter().collect();
2742        assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
2743    }
2744
2745    #[test]
2746    fn test_unwind_account_history_to_block_zero() {
2747        let temp_dir = TempDir::new().unwrap();
2748        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2749
2750        let address = Address::from([0x42; 20]);
2751
2752        // Add blocks 0-5 (including block 0)
2753        let mut batch = provider.batch();
2754        batch.append_account_history_shard(address, 0..=5).unwrap();
2755        batch.commit().unwrap();
2756
2757        // Unwind to block 0 (keep only block 0, remove 1-5)
2758        // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
2759        let mut batch = provider.batch();
2760        batch.unwind_account_history_to(address, 0).unwrap();
2761        batch.commit().unwrap();
2762
2763        // Verify only block 0 remains
2764        let key = ShardedKey::new(address, u64::MAX);
2765        let result = provider.get::<tables::AccountsHistory>(key).unwrap();
2766        assert!(result.is_some());
2767        let blocks: Vec<u64> = result.unwrap().iter().collect();
2768        assert_eq!(blocks, vec![0]);
2769    }
2770
2771    #[test]
2772    fn test_unwind_account_history_to_multi_shard() {
2773        let temp_dir = TempDir::new().unwrap();
2774        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2775
2776        let address = Address::from([0x42; 20]);
2777
2778        // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
2779        // For testing, we'll manually create shards with specific keys
2780        let mut batch = provider.batch();
2781
2782        // First shard: blocks 1-50, keyed by 50
2783        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
2784        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
2785
2786        // Second shard: blocks 51-100, keyed by MAX (sentinel)
2787        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
2788        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
2789
2790        batch.commit().unwrap();
2791
2792        // Verify we have 2 shards
2793        let shards = provider.account_history_shards(address).unwrap();
2794        assert_eq!(shards.len(), 2);
2795
2796        // Unwind to block 75 (keep 1-75, remove 76-100)
2797        let mut batch = provider.batch();
2798        batch.unwind_account_history_to(address, 75).unwrap();
2799        batch.commit().unwrap();
2800
2801        // Verify: shard1 should be untouched, shard2 should be truncated
2802        let shards = provider.account_history_shards(address).unwrap();
2803        assert_eq!(shards.len(), 2);
2804
2805        // First shard unchanged
2806        assert_eq!(shards[0].0.highest_block_number, 50);
2807        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
2808
2809        // Second shard truncated and re-keyed to MAX
2810        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
2811        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
2812    }
2813
2814    #[test]
2815    fn test_unwind_account_history_to_multi_shard_boundary_empty() {
2816        let temp_dir = TempDir::new().unwrap();
2817        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2818
2819        let address = Address::from([0x42; 20]);
2820
2821        // Create two shards
2822        let mut batch = provider.batch();
2823
2824        // First shard: blocks 1-50, keyed by 50
2825        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
2826        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
2827
2828        // Second shard: blocks 75-100, keyed by MAX
2829        let shard2 = BlockNumberList::new_pre_sorted(75..=100);
2830        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
2831
2832        batch.commit().unwrap();
2833
2834        // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
2835        let mut batch = provider.batch();
2836        batch.unwind_account_history_to(address, 60).unwrap();
2837        batch.commit().unwrap();
2838
2839        // Verify: only shard1 remains, now keyed as MAX
2840        let shards = provider.account_history_shards(address).unwrap();
2841        assert_eq!(shards.len(), 1);
2842        assert_eq!(shards[0].0.highest_block_number, u64::MAX);
2843        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
2844    }
2845
2846    #[test]
2847    fn test_account_history_shards_iterator() {
2848        let temp_dir = TempDir::new().unwrap();
2849        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2850
2851        let address = Address::from([0x42; 20]);
2852        let other_address = Address::from([0x43; 20]);
2853
2854        // Add data for two addresses
2855        let mut batch = provider.batch();
2856        batch.append_account_history_shard(address, 0..=5).unwrap();
2857        batch.append_account_history_shard(other_address, 10..=15).unwrap();
2858        batch.commit().unwrap();
2859
2860        // Query shards for first address only
2861        let shards = provider.account_history_shards(address).unwrap();
2862        assert_eq!(shards.len(), 1);
2863        assert_eq!(shards[0].0.key, address);
2864
2865        // Query shards for second address only
2866        let shards = provider.account_history_shards(other_address).unwrap();
2867        assert_eq!(shards.len(), 1);
2868        assert_eq!(shards[0].0.key, other_address);
2869
2870        // Query shards for non-existent address
2871        let non_existent = Address::from([0x99; 20]);
2872        let shards = provider.account_history_shards(non_existent).unwrap();
2873        assert!(shards.is_empty());
2874    }
2875
2876    #[test]
2877    fn test_clear_account_history() {
2878        let temp_dir = TempDir::new().unwrap();
2879        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2880
2881        let address = Address::from([0x42; 20]);
2882
2883        // Add blocks 0-10
2884        let mut batch = provider.batch();
2885        batch.append_account_history_shard(address, 0..=10).unwrap();
2886        batch.commit().unwrap();
2887
2888        // Clear all history (simulates unwind from block 0)
2889        let mut batch = provider.batch();
2890        batch.clear_account_history(address).unwrap();
2891        batch.commit().unwrap();
2892
2893        // Verify no data remains
2894        let shards = provider.account_history_shards(address).unwrap();
2895        assert!(shards.is_empty(), "All shards should be deleted");
2896    }
2897
2898    #[test]
2899    fn test_unwind_non_sentinel_boundary() {
2900        let temp_dir = TempDir::new().unwrap();
2901        let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
2902
2903        let address = Address::from([0x42; 20]);
2904
2905        // Create three shards with non-sentinel boundary
2906        let mut batch = provider.batch();
2907
2908        // Shard 1: blocks 1-50, keyed by 50
2909        let shard1 = BlockNumberList::new_pre_sorted(1..=50);
2910        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
2911
2912        // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
2913        let shard2 = BlockNumberList::new_pre_sorted(51..=100);
2914        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
2915
2916        // Shard 3: blocks 101-150, keyed by MAX (will be deleted)
2917        let shard3 = BlockNumberList::new_pre_sorted(101..=150);
2918        batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
2919
2920        batch.commit().unwrap();
2921
2922        // Verify 3 shards
2923        let shards = provider.account_history_shards(address).unwrap();
2924        assert_eq!(shards.len(), 3);
2925
2926        // Unwind to block 75 (truncates shard2, deletes shard3)
2927        let mut batch = provider.batch();
2928        batch.unwind_account_history_to(address, 75).unwrap();
2929        batch.commit().unwrap();
2930
2931        // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
2932        let shards = provider.account_history_shards(address).unwrap();
2933        assert_eq!(shards.len(), 2);
2934
2935        // First shard unchanged
2936        assert_eq!(shards[0].0.highest_block_number, 50);
2937        assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
2938
2939        // Second shard truncated and re-keyed to MAX
2940        assert_eq!(shards[1].0.highest_block_number, u64::MAX);
2941        assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
2942    }
2943
2944    #[test]
2945    fn test_batch_auto_commit_on_threshold() {
2946        let temp_dir = TempDir::new().unwrap();
2947        let provider =
2948            RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
2949
2950        // Create batch with tiny threshold (1KB) to force auto-commits
2951        let mut batch = RocksDBBatch {
2952            provider: &provider,
2953            inner: WriteBatchWithTransaction::<true>::default(),
2954            buf: Vec::new(),
2955            auto_commit_threshold: Some(1024), // 1KB
2956        };
2957
2958        // Write entries until we exceed threshold multiple times
2959        // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
2960        for i in 0..100u64 {
2961            let value = format!("value_{i:04}").into_bytes();
2962            batch.put::<TestTable>(i, &value).unwrap();
2963        }
2964
2965        // Data should already be visible (auto-committed) even before final commit
2966        // At least some entries should be readable
2967        let first_visible = provider.get::<TestTable>(0).unwrap();
2968        assert!(first_visible.is_some(), "Auto-committed data should be visible");
2969
2970        // Final commit for remaining batch
2971        batch.commit().unwrap();
2972
2973        // All entries should now be visible
2974        for i in 0..100u64 {
2975            let value = format!("value_{i:04}").into_bytes();
2976            assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
2977        }
2978    }
2979}