Skip to main content

reth_provider/
either_writer.rs

1//! Generic reader and writer abstractions for interacting with either database tables or static
2//! files.
3
4use std::{
5    collections::BTreeSet,
6    marker::PhantomData,
7    ops::{Range, RangeInclusive},
8};
9
10#[cfg(all(unix, feature = "rocksdb"))]
11use crate::providers::rocksdb::RocksDBBatch;
12use crate::{
13    providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
14    StaticFileProviderFactory,
15};
16use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
17use rayon::slice::ParallelSliceMut;
18use reth_db::{
19    cursor::{DbCursorRO, DbDupCursorRW},
20    models::{AccountBeforeTx, StorageBeforeTx},
21    static_file::TransactionSenderMask,
22    table::Value,
23    transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut, DupCursorMutTy, DupCursorTy},
24};
25use reth_db_api::{
26    cursor::DbCursorRW,
27    models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
28    tables,
29    tables::BlockNumberList,
30};
31use reth_errors::ProviderError;
32use reth_node_types::NodePrimitives;
33use reth_primitives_traits::{ReceiptTy, StorageEntry};
34use reth_static_file_types::StaticFileSegment;
35use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, StorageSettingsCache};
36use reth_storage_errors::provider::ProviderResult;
37use strum::{Display, EnumIs};
38
39/// Type alias for [`EitherReader`] constructors.
40type EitherReaderTy<'a, P, T> =
41    EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
42
43/// Type alias for [`EitherReader`] constructors.
44type DupEitherReaderTy<'a, P, T> = EitherReader<
45    'a,
46    DupCursorTy<<P as DBProvider>::Tx, T>,
47    <P as NodePrimitivesProvider>::Primitives,
48>;
49
50/// Type alias for dup [`EitherWriter`] constructors.
51type DupEitherWriterTy<'a, P, T> = EitherWriter<
52    'a,
53    DupCursorMutTy<<P as DBProvider>::Tx, T>,
54    <P as NodePrimitivesProvider>::Primitives,
55>;
56
57/// Type alias for [`EitherWriter`] constructors.
58type EitherWriterTy<'a, P, T> = EitherWriter<
59    'a,
60    CursorMutTy<<P as DBProvider>::Tx, T>,
61    <P as NodePrimitivesProvider>::Primitives,
62>;
63
64/// Helper type for `RocksDB` batch argument in writer constructors.
65///
66/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
67/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
68#[cfg(all(unix, feature = "rocksdb"))]
69pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
70/// Helper type for `RocksDB` batch argument in writer constructors.
71///
72/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
73/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
74#[cfg(not(all(unix, feature = "rocksdb")))]
75pub type RocksBatchArg<'a> = ();
76
77/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
78#[cfg(all(unix, feature = "rocksdb"))]
79pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
80/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
81#[cfg(not(all(unix, feature = "rocksdb")))]
82pub type RawRocksDBBatch = ();
83
84/// Helper type for `RocksDB` transaction reference argument in reader constructors.
85///
86/// When `rocksdb` feature is enabled, this is an optional reference to a `RocksDB` transaction.
87/// The `Option` allows callers to skip transaction creation when `RocksDB` isn't needed
88/// (e.g., on legacy MDBX-only nodes).
89/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
90/// feature gates.
91#[cfg(all(unix, feature = "rocksdb"))]
92pub type RocksTxRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksTx<'a>>;
93/// Helper type for `RocksDB` transaction reference argument in reader constructors.
94///
95/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
96/// feature gates.
97#[cfg(not(all(unix, feature = "rocksdb")))]
98pub type RocksTxRefArg<'a> = ();
99
100/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
101#[derive(Debug, Display)]
102pub enum EitherWriter<'a, CURSOR, N> {
103    /// Write to database table via cursor
104    Database(CURSOR),
105    /// Write to static file
106    StaticFile(StaticFileProviderRWRefMut<'a, N>),
107    /// Write to `RocksDB` using a write-only batch (historical tables).
108    #[cfg(all(unix, feature = "rocksdb"))]
109    RocksDB(RocksDBBatch<'a>),
110}
111
112impl<'a> EitherWriter<'a, (), ()> {
113    /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes.
114    pub fn new_receipts<P>(
115        provider: &'a P,
116        block_number: BlockNumber,
117    ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
118    where
119        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
120        P::Tx: DbTxMut,
121        ReceiptTy<P::Primitives>: Value,
122    {
123        if Self::receipts_destination(provider).is_static_file() {
124            Ok(EitherWriter::StaticFile(
125                provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
126            ))
127        } else {
128            Ok(EitherWriter::Database(
129                provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
130            ))
131        }
132    }
133
134    /// Creates a new [`EitherWriter`] for senders based on storage settings.
135    pub fn new_senders<P>(
136        provider: &'a P,
137        block_number: BlockNumber,
138    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
139    where
140        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
141        P::Tx: DbTxMut,
142    {
143        if EitherWriterDestination::senders(provider).is_static_file() {
144            Ok(EitherWriter::StaticFile(
145                provider
146                    .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
147            ))
148        } else {
149            Ok(EitherWriter::Database(
150                provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
151            ))
152        }
153    }
154
155    /// Creates a new [`EitherWriter`] for account changesets based on storage settings and prune
156    /// modes.
157    pub fn new_account_changesets<P>(
158        provider: &'a P,
159        block_number: BlockNumber,
160    ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::AccountChangeSets>>
161    where
162        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
163        P::Tx: DbTxMut,
164    {
165        if provider.cached_storage_settings().storage_v2 {
166            Ok(EitherWriter::StaticFile(
167                provider
168                    .get_static_file_writer(block_number, StaticFileSegment::AccountChangeSets)?,
169            ))
170        } else {
171            Ok(EitherWriter::Database(
172                provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?,
173            ))
174        }
175    }
176
177    /// Creates a new [`EitherWriter`] for storage changesets based on storage settings.
178    pub fn new_storage_changesets<P>(
179        provider: &'a P,
180        block_number: BlockNumber,
181    ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::StorageChangeSets>>
182    where
183        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
184        P::Tx: DbTxMut,
185    {
186        if provider.cached_storage_settings().storage_v2 {
187            Ok(EitherWriter::StaticFile(
188                provider
189                    .get_static_file_writer(block_number, StaticFileSegment::StorageChangeSets)?,
190            ))
191        } else {
192            Ok(EitherWriter::Database(
193                provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?,
194            ))
195        }
196    }
197
198    /// Returns the destination for writing receipts.
199    ///
200    /// The rules are as follows:
201    /// - If the node should not always write receipts to static files, and any receipt pruning is
202    ///   enabled, write to the database.
203    /// - If the node should always write receipts to static files, but receipt log filter pruning
204    ///   is enabled, write to the database.
205    /// - Otherwise, write to static files.
206    pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
207        provider: &P,
208    ) -> EitherWriterDestination {
209        let receipts_in_static_files = provider.cached_storage_settings().storage_v2;
210        let prune_modes = provider.prune_modes_ref();
211
212        if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
213            // TODO: support writing receipts to static files with log filter pruning enabled
214            receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
215        {
216            EitherWriterDestination::Database
217        } else {
218            EitherWriterDestination::StaticFile
219        }
220    }
221
222    /// Returns the destination for writing account changesets.
223    ///
224    /// This determines the destination based solely on storage settings.
225    pub fn account_changesets_destination<P: DBProvider + StorageSettingsCache>(
226        provider: &P,
227    ) -> EitherWriterDestination {
228        if provider.cached_storage_settings().storage_v2 {
229            EitherWriterDestination::StaticFile
230        } else {
231            EitherWriterDestination::Database
232        }
233    }
234
235    /// Returns the destination for writing storage changesets.
236    ///
237    /// This determines the destination based solely on storage settings.
238    pub fn storage_changesets_destination<P: DBProvider + StorageSettingsCache>(
239        provider: &P,
240    ) -> EitherWriterDestination {
241        if provider.cached_storage_settings().storage_v2 {
242            EitherWriterDestination::StaticFile
243        } else {
244            EitherWriterDestination::Database
245        }
246    }
247
248    /// Creates a new [`EitherWriter`] for storages history based on storage settings.
249    pub fn new_storages_history<P>(
250        provider: &P,
251        _rocksdb_batch: RocksBatchArg<'a>,
252    ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
253    where
254        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
255        P::Tx: DbTxMut,
256    {
257        #[cfg(all(unix, feature = "rocksdb"))]
258        if provider.cached_storage_settings().storage_v2 {
259            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
260        }
261
262        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
263    }
264
265    /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
266    pub fn new_transaction_hash_numbers<P>(
267        provider: &P,
268        _rocksdb_batch: RocksBatchArg<'a>,
269    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
270    where
271        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
272        P::Tx: DbTxMut,
273    {
274        #[cfg(all(unix, feature = "rocksdb"))]
275        if provider.cached_storage_settings().storage_v2 {
276            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
277        }
278
279        Ok(EitherWriter::Database(
280            provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
281        ))
282    }
283
284    /// Creates a new [`EitherWriter`] for account history based on storage settings.
285    pub fn new_accounts_history<P>(
286        provider: &P,
287        _rocksdb_batch: RocksBatchArg<'a>,
288    ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
289    where
290        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
291        P::Tx: DbTxMut,
292    {
293        #[cfg(all(unix, feature = "rocksdb"))]
294        if provider.cached_storage_settings().storage_v2 {
295            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
296        }
297
298        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
299    }
300}
301
302impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
303    /// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
304    ///
305    /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
306    /// `None` for other variants.
307    ///
308    /// This is used to defer `RocksDB` commits to the provider level, ensuring all
309    /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
310    #[cfg(all(unix, feature = "rocksdb"))]
311    pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
312        match self {
313            Self::Database(_) | Self::StaticFile(_) => None,
314            Self::RocksDB(batch) => Some(batch.into_inner()),
315        }
316    }
317
318    /// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
319    ///
320    /// Without the `rocksdb` feature, this always returns `None`.
321    #[cfg(not(all(unix, feature = "rocksdb")))]
322    pub fn into_raw_rocksdb_batch(self) -> Option<RawRocksDBBatch> {
323        match self {
324            Self::Database(_) | Self::StaticFile(_) => None,
325        }
326    }
327
328    /// Increment the block number.
329    ///
330    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
331    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
332        match self {
333            Self::Database(_) => Ok(()),
334            Self::StaticFile(writer) => writer.increment_block(expected_block_number),
335            #[cfg(all(unix, feature = "rocksdb"))]
336            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
337        }
338    }
339
340    /// Ensures that the writer is positioned at the specified block number.
341    ///
342    /// If the writer is positioned at a greater block number than the specified one, the writer
343    /// will NOT be unwound and the error will be returned.
344    ///
345    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
346    pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
347        match self {
348            Self::Database(_) => Ok(()),
349            Self::StaticFile(writer) => writer.ensure_at_block(block_number),
350            #[cfg(all(unix, feature = "rocksdb"))]
351            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
352        }
353    }
354}
355
356impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
357where
358    N::Receipt: Value,
359    CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
360{
361    /// Append a transaction receipt.
362    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
363        match self {
364            Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
365            Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
366            #[cfg(all(unix, feature = "rocksdb"))]
367            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
368        }
369    }
370}
371
372impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
373where
374    CURSOR: DbCursorRW<tables::TransactionSenders>,
375{
376    /// Append a transaction sender to the destination
377    pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
378        match self {
379            Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
380            Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
381            #[cfg(all(unix, feature = "rocksdb"))]
382            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
383        }
384    }
385
386    /// Append transaction senders to the destination
387    pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
388    where
389        I: Iterator<Item = (TxNumber, Address)>,
390    {
391        match self {
392            Self::Database(cursor) => {
393                for (tx_num, sender) in senders {
394                    cursor.append(tx_num, &sender)?;
395                }
396                Ok(())
397            }
398            Self::StaticFile(writer) => writer.append_transaction_senders(senders),
399            #[cfg(all(unix, feature = "rocksdb"))]
400            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
401        }
402    }
403
404    /// Removes all transaction senders above the given transaction number, and stops at the given
405    /// block number.
406    pub fn prune_senders(
407        &mut self,
408        unwind_tx_from: TxNumber,
409        block: BlockNumber,
410    ) -> ProviderResult<()>
411    where
412        CURSOR: DbCursorRO<tables::TransactionSenders>,
413    {
414        match self {
415            Self::Database(cursor) => {
416                let mut walker = cursor.walk_range(unwind_tx_from..)?;
417                while walker.next().transpose()?.is_some() {
418                    walker.delete_current()?;
419                }
420            }
421            Self::StaticFile(writer) => {
422                let static_file_transaction_sender_num = writer
423                    .reader()
424                    .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
425
426                let to_delete = static_file_transaction_sender_num
427                    .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
428                    .unwrap_or_default();
429
430                writer.prune_transaction_senders(to_delete, block)?;
431            }
432            #[cfg(all(unix, feature = "rocksdb"))]
433            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
434        }
435
436        Ok(())
437    }
438}
439
440impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
441where
442    CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
443{
444    /// Puts a transaction hash number mapping.
445    ///
446    /// When `append_only` is true, uses `cursor.append()` which is significantly faster
447    /// but requires entries to be inserted in order and the table to be empty.
448    /// When false, uses `cursor.upsert()` which handles arbitrary insertion order and duplicates.
449    pub fn put_transaction_hash_number(
450        &mut self,
451        hash: TxHash,
452        tx_num: TxNumber,
453        append_only: bool,
454    ) -> ProviderResult<()> {
455        match self {
456            Self::Database(cursor) => {
457                if append_only {
458                    Ok(cursor.append(hash, &tx_num)?)
459                } else {
460                    Ok(cursor.upsert(hash, &tx_num)?)
461                }
462            }
463            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
464            #[cfg(all(unix, feature = "rocksdb"))]
465            Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
466        }
467    }
468
469    /// Puts multiple transaction hash number mappings in a batch.
470    ///
471    /// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor.
472    /// This is more efficient than calling `put_transaction_hash_number` repeatedly.
473    ///
474    /// When `append_only` is true, uses `cursor.append()` which requires entries to be
475    /// pre-sorted and the table to be empty or have only lower keys.
476    /// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
477    pub fn put_transaction_hash_numbers_batch(
478        &mut self,
479        entries: Vec<(TxHash, TxNumber)>,
480        append_only: bool,
481    ) -> ProviderResult<()> {
482        match self {
483            Self::Database(cursor) => {
484                for (hash, tx_num) in entries {
485                    if append_only {
486                        cursor.append(hash, &tx_num)?;
487                    } else {
488                        cursor.upsert(hash, &tx_num)?;
489                    }
490                }
491                Ok(())
492            }
493            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
494            #[cfg(all(unix, feature = "rocksdb"))]
495            Self::RocksDB(batch) => {
496                for (hash, tx_num) in entries {
497                    batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
498                }
499                Ok(())
500            }
501        }
502    }
503
504    /// Deletes a transaction hash number mapping.
505    pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
506        match self {
507            Self::Database(cursor) => {
508                if cursor.seek_exact(hash)?.is_some() {
509                    cursor.delete_current()?;
510                }
511                Ok(())
512            }
513            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
514            #[cfg(all(unix, feature = "rocksdb"))]
515            Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
516        }
517    }
518}
519
520impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
521where
522    CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
523{
524    /// Puts a storage history entry.
525    pub fn put_storage_history(
526        &mut self,
527        key: StorageShardedKey,
528        value: &BlockNumberList,
529    ) -> ProviderResult<()> {
530        match self {
531            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
532            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
533            #[cfg(all(unix, feature = "rocksdb"))]
534            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
535        }
536    }
537
538    /// Deletes a storage history entry.
539    pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
540        match self {
541            Self::Database(cursor) => {
542                if cursor.seek_exact(key)?.is_some() {
543                    cursor.delete_current()?;
544                }
545                Ok(())
546            }
547            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
548            #[cfg(all(unix, feature = "rocksdb"))]
549            Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
550        }
551    }
552
553    /// Appends a storage history entry (for first sync - more efficient).
554    pub fn append_storage_history(
555        &mut self,
556        key: StorageShardedKey,
557        value: &BlockNumberList,
558    ) -> ProviderResult<()> {
559        match self {
560            Self::Database(cursor) => Ok(cursor.append(key, value)?),
561            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
562            #[cfg(all(unix, feature = "rocksdb"))]
563            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
564        }
565    }
566
567    /// Upserts a storage history entry (for incremental sync).
568    pub fn upsert_storage_history(
569        &mut self,
570        key: StorageShardedKey,
571        value: &BlockNumberList,
572    ) -> ProviderResult<()> {
573        match self {
574            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
575            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
576            #[cfg(all(unix, feature = "rocksdb"))]
577            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
578        }
579    }
580
581    /// Gets the last shard for an address and storage key (keyed with `u64::MAX`).
582    pub fn get_last_storage_history_shard(
583        &mut self,
584        address: Address,
585        storage_key: B256,
586    ) -> ProviderResult<Option<BlockNumberList>> {
587        let key = StorageShardedKey::last(address, storage_key);
588        match self {
589            Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
590            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
591            #[cfg(all(unix, feature = "rocksdb"))]
592            Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
593        }
594    }
595}
596
597impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
598where
599    CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
600{
601    /// Appends an account history entry (for first sync - more efficient).
602    pub fn append_account_history(
603        &mut self,
604        key: ShardedKey<Address>,
605        value: &BlockNumberList,
606    ) -> ProviderResult<()> {
607        match self {
608            Self::Database(cursor) => Ok(cursor.append(key, value)?),
609            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
610            #[cfg(all(unix, feature = "rocksdb"))]
611            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
612        }
613    }
614
615    /// Upserts an account history entry (for incremental sync).
616    pub fn upsert_account_history(
617        &mut self,
618        key: ShardedKey<Address>,
619        value: &BlockNumberList,
620    ) -> ProviderResult<()> {
621        match self {
622            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
623            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
624            #[cfg(all(unix, feature = "rocksdb"))]
625            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
626        }
627    }
628
629    /// Gets the last shard for an address (keyed with `u64::MAX`).
630    pub fn get_last_account_history_shard(
631        &mut self,
632        address: Address,
633    ) -> ProviderResult<Option<BlockNumberList>> {
634        match self {
635            Self::Database(cursor) => {
636                Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
637            }
638            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
639            #[cfg(all(unix, feature = "rocksdb"))]
640            Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
641        }
642    }
643
644    /// Deletes an account history entry.
645    pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
646        match self {
647            Self::Database(cursor) => {
648                if cursor.seek_exact(key)?.is_some() {
649                    cursor.delete_current()?;
650                }
651                Ok(())
652            }
653            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
654            #[cfg(all(unix, feature = "rocksdb"))]
655            Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
656        }
657    }
658}
659
660impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
661where
662    CURSOR: DbDupCursorRW<tables::AccountChangeSets>,
663{
664    /// Append account changeset for a block.
665    ///
666    /// NOTE: This _sorts_ the changesets by address before appending
667    pub fn append_account_changeset(
668        &mut self,
669        block_number: BlockNumber,
670        mut changeset: Vec<AccountBeforeTx>,
671    ) -> ProviderResult<()> {
672        // First sort the changesets
673        changeset.par_sort_by_key(|a| a.address);
674        match self {
675            Self::Database(cursor) => {
676                for change in changeset {
677                    cursor.append_dup(block_number, change)?;
678                }
679            }
680            Self::StaticFile(writer) => {
681                writer.append_account_changeset(changeset, block_number)?;
682            }
683            #[cfg(all(unix, feature = "rocksdb"))]
684            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
685        }
686
687        Ok(())
688    }
689}
690
691impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
692where
693    CURSOR: DbDupCursorRW<tables::StorageChangeSets>,
694{
695    /// Append storage changeset for a block.
696    ///
697    /// NOTE: This _sorts_ the changesets by address and storage key before appending.
698    pub fn append_storage_changeset(
699        &mut self,
700        block_number: BlockNumber,
701        mut changeset: Vec<StorageBeforeTx>,
702    ) -> ProviderResult<()> {
703        changeset.par_sort_by_key(|change| (change.address, change.key));
704
705        match self {
706            Self::Database(cursor) => {
707                for change in changeset {
708                    let storage_id = BlockNumberAddress((block_number, change.address));
709                    cursor.append_dup(
710                        storage_id,
711                        StorageEntry { key: change.key, value: change.value },
712                    )?;
713                }
714            }
715            Self::StaticFile(writer) => {
716                writer.append_storage_changeset(changeset, block_number)?;
717            }
718            #[cfg(all(unix, feature = "rocksdb"))]
719            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
720        }
721
722        Ok(())
723    }
724}
725
726/// Represents a source for reading data, either from database, static files, or `RocksDB`.
727#[derive(Debug, Display)]
728pub enum EitherReader<'a, CURSOR, N> {
729    /// Read from database table via cursor
730    Database(CURSOR, PhantomData<&'a ()>),
731    /// Read from static file
732    StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
733    /// Read from `RocksDB` transaction
734    #[cfg(all(unix, feature = "rocksdb"))]
735    RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
736}
737
738impl<'a> EitherReader<'a, (), ()> {
739    /// Creates a new [`EitherReader`] for senders based on storage settings.
740    pub fn new_senders<P>(
741        provider: &P,
742    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
743    where
744        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
745        P::Tx: DbTx,
746    {
747        if EitherWriterDestination::senders(provider).is_static_file() {
748            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
749        } else {
750            Ok(EitherReader::Database(
751                provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
752                PhantomData,
753            ))
754        }
755    }
756
757    /// Creates a new [`EitherReader`] for storages history based on storage settings.
758    pub fn new_storages_history<P>(
759        provider: &P,
760        _rocksdb_tx: RocksTxRefArg<'a>,
761    ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
762    where
763        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
764        P::Tx: DbTx,
765    {
766        #[cfg(all(unix, feature = "rocksdb"))]
767        if provider.cached_storage_settings().storage_v2 {
768            return Ok(EitherReader::RocksDB(
769                _rocksdb_tx.expect("storages_history_in_rocksdb requires rocksdb tx"),
770            ));
771        }
772
773        Ok(EitherReader::Database(
774            provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
775            PhantomData,
776        ))
777    }
778
779    /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
780    pub fn new_transaction_hash_numbers<P>(
781        provider: &P,
782        _rocksdb_tx: RocksTxRefArg<'a>,
783    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
784    where
785        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
786        P::Tx: DbTx,
787    {
788        #[cfg(all(unix, feature = "rocksdb"))]
789        if provider.cached_storage_settings().storage_v2 {
790            return Ok(EitherReader::RocksDB(
791                _rocksdb_tx.expect("transaction_hash_numbers_in_rocksdb requires rocksdb tx"),
792            ));
793        }
794
795        Ok(EitherReader::Database(
796            provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
797            PhantomData,
798        ))
799    }
800
801    /// Creates a new [`EitherReader`] for account history based on storage settings.
802    pub fn new_accounts_history<P>(
803        provider: &P,
804        _rocksdb_tx: RocksTxRefArg<'a>,
805    ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
806    where
807        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
808        P::Tx: DbTx,
809    {
810        #[cfg(all(unix, feature = "rocksdb"))]
811        if provider.cached_storage_settings().storage_v2 {
812            return Ok(EitherReader::RocksDB(
813                _rocksdb_tx.expect("account_history_in_rocksdb requires rocksdb tx"),
814            ));
815        }
816
817        Ok(EitherReader::Database(
818            provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
819            PhantomData,
820        ))
821    }
822
823    /// Creates a new [`EitherReader`] for account changesets based on storage settings.
824    pub fn new_account_changesets<P>(
825        provider: &P,
826    ) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
827    where
828        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
829        P::Tx: DbTx,
830    {
831        if EitherWriterDestination::account_changesets(provider).is_static_file() {
832            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
833        } else {
834            Ok(EitherReader::Database(
835                provider.tx_ref().cursor_dup_read::<tables::AccountChangeSets>()?,
836                PhantomData,
837            ))
838        }
839    }
840}
841
842impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
843where
844    CURSOR: DbCursorRO<tables::TransactionSenders>,
845{
846    /// Fetches the senders for a range of transactions.
847    pub fn senders_by_tx_range(
848        &mut self,
849        range: Range<TxNumber>,
850    ) -> ProviderResult<HashMap<TxNumber, Address>> {
851        match self {
852            Self::Database(cursor, _) => cursor
853                .walk_range(range)?
854                .map(|result| result.map_err(ProviderError::from))
855                .collect::<ProviderResult<HashMap<_, _>>>(),
856            Self::StaticFile(provider, _) => range
857                .clone()
858                .zip(provider.fetch_range_iter(
859                    StaticFileSegment::TransactionSenders,
860                    range,
861                    |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
862                )?)
863                .filter_map(|(tx_num, sender)| {
864                    let result = sender.transpose()?;
865                    Some(result.map(|sender| (tx_num, sender)))
866                })
867                .collect::<ProviderResult<HashMap<_, _>>>(),
868            #[cfg(all(unix, feature = "rocksdb"))]
869            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
870        }
871    }
872}
873
874impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
875where
876    CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
877{
878    /// Gets a transaction number by its hash.
879    pub fn get_transaction_hash_number(
880        &mut self,
881        hash: TxHash,
882    ) -> ProviderResult<Option<TxNumber>> {
883        match self {
884            Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
885            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
886            #[cfg(all(unix, feature = "rocksdb"))]
887            Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
888        }
889    }
890}
891
892impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
893where
894    CURSOR: DbCursorRO<tables::StoragesHistory>,
895{
896    /// Gets a storage history shard entry for the given [`StorageShardedKey`], if present.
897    pub fn get_storage_history(
898        &mut self,
899        key: StorageShardedKey,
900    ) -> ProviderResult<Option<BlockNumberList>> {
901        match self {
902            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
903            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
904            #[cfg(all(unix, feature = "rocksdb"))]
905            Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
906        }
907    }
908
909    /// Lookup storage history and return [`HistoryInfo`].
910    pub fn storage_history_info(
911        &mut self,
912        address: Address,
913        storage_key: alloy_primitives::B256,
914        block_number: BlockNumber,
915        lowest_available_block_number: Option<BlockNumber>,
916    ) -> ProviderResult<HistoryInfo> {
917        match self {
918            Self::Database(cursor, _) => {
919                let key = StorageShardedKey::new(address, storage_key, block_number);
920                history_info::<tables::StoragesHistory, _, _>(
921                    cursor,
922                    key,
923                    block_number,
924                    |k| k.address == address && k.sharded_key.key == storage_key,
925                    lowest_available_block_number,
926                )
927            }
928            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
929            #[cfg(all(unix, feature = "rocksdb"))]
930            Self::RocksDB(tx) => tx.storage_history_info(
931                address,
932                storage_key,
933                block_number,
934                lowest_available_block_number,
935            ),
936        }
937    }
938}
939
940impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
941where
942    CURSOR: DbCursorRO<tables::AccountsHistory>,
943{
944    /// Gets an account history shard entry for the given [`ShardedKey`], if present.
945    pub fn get_account_history(
946        &mut self,
947        key: ShardedKey<Address>,
948    ) -> ProviderResult<Option<BlockNumberList>> {
949        match self {
950            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
951            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
952            #[cfg(all(unix, feature = "rocksdb"))]
953            Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
954        }
955    }
956
957    /// Lookup account history and return [`HistoryInfo`].
958    pub fn account_history_info(
959        &mut self,
960        address: Address,
961        block_number: BlockNumber,
962        lowest_available_block_number: Option<BlockNumber>,
963    ) -> ProviderResult<HistoryInfo> {
964        match self {
965            Self::Database(cursor, _) => {
966                let key = ShardedKey::new(address, block_number);
967                history_info::<tables::AccountsHistory, _, _>(
968                    cursor,
969                    key,
970                    block_number,
971                    |k| k.key == address,
972                    lowest_available_block_number,
973                )
974            }
975            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
976            #[cfg(all(unix, feature = "rocksdb"))]
977            Self::RocksDB(tx) => {
978                tx.account_history_info(address, block_number, lowest_available_block_number)
979            }
980        }
981    }
982}
983
984impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
985where
986    CURSOR: DbCursorRO<tables::AccountChangeSets>,
987{
988    /// Iterate over account changesets and return all account address that were changed.
989    pub fn changed_accounts_with_range(
990        &mut self,
991        range: RangeInclusive<BlockNumber>,
992    ) -> ProviderResult<BTreeSet<Address>> {
993        match self {
994            Self::StaticFile(provider, _) => {
995                let highest_static_block =
996                    provider.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
997
998                let Some(highest) = highest_static_block else {
999                    return Err(ProviderError::MissingHighestStaticFileBlock(
1000                        StaticFileSegment::AccountChangeSets,
1001                    ))
1002                };
1003
1004                let start = *range.start();
1005                let static_end = (*range.end()).min(highest);
1006
1007                let mut changed_accounts = BTreeSet::default();
1008                if start <= static_end {
1009                    for block in start..=static_end {
1010                        let block_changesets = provider.account_block_changeset(block)?;
1011                        for changeset in block_changesets {
1012                            changed_accounts.insert(changeset.address);
1013                        }
1014                    }
1015                }
1016
1017                Ok(changed_accounts)
1018            }
1019            Self::Database(provider, _) => provider
1020                .walk_range(range)?
1021                .map(|entry| {
1022                    entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
1023                })
1024                .collect(),
1025            #[cfg(all(unix, feature = "rocksdb"))]
1026            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
1027        }
1028    }
1029}
1030
1031/// Destination for writing data.
1032#[derive(Debug, EnumIs)]
1033pub enum EitherWriterDestination {
1034    /// Write to database table
1035    Database,
1036    /// Write to static file
1037    StaticFile,
1038    /// Write to `RocksDB`
1039    RocksDB,
1040}
1041
1042impl EitherWriterDestination {
1043    /// Returns the destination for writing senders based on storage settings.
1044    pub fn senders<P>(provider: &P) -> Self
1045    where
1046        P: StorageSettingsCache,
1047    {
1048        // Write senders to static files only if they're explicitly enabled
1049        if provider.cached_storage_settings().storage_v2 {
1050            Self::StaticFile
1051        } else {
1052            Self::Database
1053        }
1054    }
1055
1056    /// Returns the destination for writing account changesets based on storage settings.
1057    pub fn account_changesets<P>(provider: &P) -> Self
1058    where
1059        P: StorageSettingsCache,
1060    {
1061        // Write account changesets to static files only if they're explicitly enabled
1062        if provider.cached_storage_settings().storage_v2 {
1063            Self::StaticFile
1064        } else {
1065            Self::Database
1066        }
1067    }
1068
1069    /// Returns the destination for writing storage changesets based on storage settings.
1070    pub fn storage_changesets<P>(provider: &P) -> Self
1071    where
1072        P: StorageSettingsCache,
1073    {
1074        // Write storage changesets to static files only if they're explicitly enabled
1075        if provider.cached_storage_settings().storage_v2 {
1076            Self::StaticFile
1077        } else {
1078            Self::Database
1079        }
1080    }
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085    use crate::{test_utils::create_test_provider_factory, StaticFileWriter};
1086
1087    use super::*;
1088    use alloy_primitives::Address;
1089    use reth_db::models::AccountBeforeTx;
1090    use reth_static_file_types::StaticFileSegment;
1091    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1092
1093    /// Verifies that `changed_accounts_with_range` correctly caps the query range to the
1094    /// static file tip when the requested range extends beyond it.
1095    ///
1096    /// This test documents the fix for an off-by-one bug where the code computed
1097    /// `static_end = range.end().min(highest + 1)` instead of `min(highest)`.
1098    /// The bug allowed iteration to attempt reading block `highest + 1` which doesn't
1099    /// exist (silently returning empty results due to `MissingStaticFileBlock` handling).
1100    ///
1101    /// While the bug was masked by error handling, it caused:
1102    /// 1. Unnecessary iteration/lookup for non-existent blocks
1103    /// 2. Potential overflow when `highest == u64::MAX`
1104    #[test]
1105    fn test_changed_accounts_with_range_caps_at_static_file_tip() {
1106        let factory = create_test_provider_factory();
1107        let highest_block = 5u64;
1108
1109        let addresses: Vec<Address> = (0..=highest_block)
1110            .map(|i| {
1111                let mut addr = Address::ZERO;
1112                addr.0[0] = i as u8;
1113                addr
1114            })
1115            .collect();
1116
1117        {
1118            let sf_provider = factory.static_file_provider();
1119            let mut writer =
1120                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1121
1122            for block_num in 0..=highest_block {
1123                let changeset =
1124                    vec![AccountBeforeTx { address: addresses[block_num as usize], info: None }];
1125                writer.append_account_changeset(changeset, block_num).unwrap();
1126            }
1127            writer.commit().unwrap();
1128        }
1129
1130        factory.set_storage_settings_cache(StorageSettings::v2());
1131
1132        let provider = factory.database_provider_ro().unwrap();
1133
1134        let sf_tip = provider
1135            .static_file_provider()
1136            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1137        assert_eq!(sf_tip, Some(highest_block));
1138
1139        let mut reader = EitherReader::new_account_changesets(&provider).unwrap();
1140        assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1141
1142        // Query range 0..=10 when tip is 5 - should return only accounts from blocks 0-5
1143        let result = reader.changed_accounts_with_range(0..=10).unwrap();
1144
1145        let expected: BTreeSet<Address> = addresses.into_iter().collect();
1146        assert_eq!(result, expected);
1147    }
1148
1149    #[test]
1150    fn test_reader_senders_by_tx_range() {
1151        let factory = create_test_provider_factory();
1152
1153        // Insert senders only from 1 to 4, but we will query from 0 to 5.
1154        let senders = [
1155            (1, Address::random()),
1156            (2, Address::random()),
1157            (3, Address::random()),
1158            (4, Address::random()),
1159        ];
1160
1161        for transaction_senders_in_static_files in [false, true] {
1162            factory.set_storage_settings_cache(if transaction_senders_in_static_files {
1163                StorageSettings::v2()
1164            } else {
1165                StorageSettings::v1()
1166            });
1167
1168            let provider = factory.database_provider_rw().unwrap();
1169            let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
1170            if transaction_senders_in_static_files {
1171                assert!(matches!(writer, EitherWriter::StaticFile(_)));
1172            } else {
1173                assert!(matches!(writer, EitherWriter::Database(_)));
1174            }
1175
1176            writer.increment_block(0).unwrap();
1177            writer.append_senders(senders.iter().copied()).unwrap();
1178            drop(writer);
1179            provider.commit().unwrap();
1180
1181            let provider = factory.database_provider_ro().unwrap();
1182            let mut reader = EitherReader::new_senders(&provider).unwrap();
1183            if transaction_senders_in_static_files {
1184                assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1185            } else {
1186                assert!(matches!(reader, EitherReader::Database(_, _)));
1187            }
1188
1189            assert_eq!(
1190                reader.senders_by_tx_range(0..6).unwrap(),
1191                senders.iter().copied().collect::<HashMap<_, _>>(),
1192                "{reader}"
1193            );
1194        }
1195    }
1196}
1197
1198#[cfg(all(test, unix, feature = "rocksdb"))]
1199mod rocksdb_tests {
1200    use super::*;
1201    use crate::{
1202        providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
1203        test_utils::create_test_provider_factory,
1204        RocksDBProviderFactory,
1205    };
1206    use alloy_primitives::{Address, B256};
1207    use reth_db_api::{
1208        models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1209        tables,
1210        transaction::DbTxMut,
1211    };
1212    use reth_ethereum_primitives::EthPrimitives;
1213    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1214    use std::marker::PhantomData;
1215    use tempfile::TempDir;
1216
1217    fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
1218        let temp_dir = TempDir::new().unwrap();
1219        let provider = RocksDBBuilder::new(temp_dir.path())
1220            .with_table::<tables::TransactionHashNumbers>()
1221            .with_table::<tables::StoragesHistory>()
1222            .with_table::<tables::AccountsHistory>()
1223            .build()
1224            .unwrap();
1225        (temp_dir, provider)
1226    }
1227
1228    /// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
1229    /// when the storage setting is enabled, and that put operations followed by commit
1230    /// persist the data to `RocksDB`.
1231    #[test]
1232    fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
1233        let factory = create_test_provider_factory();
1234
1235        // Enable RocksDB for transaction hash numbers
1236        factory.set_storage_settings_cache(StorageSettings::v2());
1237
1238        let hash1 = B256::from([1u8; 32]);
1239        let hash2 = B256::from([2u8; 32]);
1240        let tx_num1 = 100u64;
1241        let tx_num2 = 200u64;
1242
1243        // Get the RocksDB batch from the provider
1244        let rocksdb = factory.rocksdb_provider();
1245        let batch = rocksdb.batch();
1246
1247        // Create EitherWriter with RocksDB
1248        let provider = factory.database_provider_rw().unwrap();
1249        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1250
1251        // Verify we got a RocksDB writer
1252        assert!(matches!(writer, EitherWriter::RocksDB(_)));
1253
1254        // Write transaction hash numbers (append_only=false since we're using RocksDB)
1255        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1256        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1257
1258        // Extract the batch and register with provider for commit
1259        if let Some(batch) = writer.into_raw_rocksdb_batch() {
1260            provider.set_pending_rocksdb_batch(batch);
1261        }
1262
1263        // Commit via provider - this commits RocksDB batch too
1264        provider.commit().unwrap();
1265
1266        // Verify data was written to RocksDB
1267        let rocksdb = factory.rocksdb_provider();
1268        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1269        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1270    }
1271
1272    /// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
1273    #[test]
1274    fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
1275        let factory = create_test_provider_factory();
1276
1277        // Enable RocksDB for transaction hash numbers
1278        factory.set_storage_settings_cache(StorageSettings::v2());
1279
1280        let hash = B256::from([1u8; 32]);
1281        let tx_num = 100u64;
1282
1283        // First, write a value directly to RocksDB
1284        let rocksdb = factory.rocksdb_provider();
1285        rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1286        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1287
1288        // Now delete using EitherWriter
1289        let batch = rocksdb.batch();
1290        let provider = factory.database_provider_rw().unwrap();
1291        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1292        writer.delete_transaction_hash_number(hash).unwrap();
1293
1294        // Extract the batch and commit via provider
1295        if let Some(batch) = writer.into_raw_rocksdb_batch() {
1296            provider.set_pending_rocksdb_batch(batch);
1297        }
1298        provider.commit().unwrap();
1299
1300        // Verify deletion
1301        let rocksdb = factory.rocksdb_provider();
1302        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1303    }
1304
1305    #[test]
1306    fn test_rocksdb_batch_transaction_hash_numbers() {
1307        let (_temp_dir, provider) = create_rocksdb_provider();
1308
1309        let hash1 = B256::from([1u8; 32]);
1310        let hash2 = B256::from([2u8; 32]);
1311        let tx_num1 = 100u64;
1312        let tx_num2 = 200u64;
1313
1314        // Write via RocksDBBatch (same as EitherWriter::RocksDB would use internally)
1315        let mut batch = provider.batch();
1316        batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
1317        batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
1318        batch.commit().unwrap();
1319
1320        // Read via RocksTx (same as EitherReader::RocksDB would use internally)
1321        let tx = provider.tx();
1322        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1323        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1324
1325        // Test missing key
1326        let missing_hash = B256::from([99u8; 32]);
1327        assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
1328    }
1329
1330    #[test]
1331    fn test_rocksdb_batch_storage_history() {
1332        let (_temp_dir, provider) = create_rocksdb_provider();
1333
1334        let address = Address::random();
1335        let storage_key = B256::from([1u8; 32]);
1336        let key = StorageShardedKey::new(address, storage_key, 1000);
1337        let value = IntegerList::new([1, 5, 10, 50]).unwrap();
1338
1339        // Write via RocksDBBatch
1340        let mut batch = provider.batch();
1341        batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1342        batch.commit().unwrap();
1343
1344        // Read via RocksTx
1345        let tx = provider.tx();
1346        let result = tx.get::<tables::StoragesHistory>(key).unwrap();
1347        assert_eq!(result, Some(value));
1348
1349        // Test missing key
1350        let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
1351        assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
1352    }
1353
1354    #[test]
1355    fn test_rocksdb_batch_account_history() {
1356        let (_temp_dir, provider) = create_rocksdb_provider();
1357
1358        let address = Address::random();
1359        let key = ShardedKey::new(address, 1000);
1360        let value = IntegerList::new([1, 10, 100, 500]).unwrap();
1361
1362        // Write via RocksDBBatch
1363        let mut batch = provider.batch();
1364        batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1365        batch.commit().unwrap();
1366
1367        // Read via RocksTx
1368        let tx = provider.tx();
1369        let result = tx.get::<tables::AccountsHistory>(key).unwrap();
1370        assert_eq!(result, Some(value));
1371
1372        // Test missing key
1373        let missing_key = ShardedKey::new(Address::random(), 0);
1374        assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
1375    }
1376
1377    #[test]
1378    fn test_rocksdb_batch_delete_transaction_hash_number() {
1379        let (_temp_dir, provider) = create_rocksdb_provider();
1380
1381        let hash = B256::from([1u8; 32]);
1382        let tx_num = 100u64;
1383
1384        // First write
1385        provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1386        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1387
1388        // Delete via RocksDBBatch
1389        let mut batch = provider.batch();
1390        batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
1391        batch.commit().unwrap();
1392
1393        // Verify deletion
1394        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1395    }
1396
1397    #[test]
1398    fn test_rocksdb_batch_delete_storage_history() {
1399        let (_temp_dir, provider) = create_rocksdb_provider();
1400
1401        let address = Address::random();
1402        let storage_key = B256::from([1u8; 32]);
1403        let key = StorageShardedKey::new(address, storage_key, 1000);
1404        let value = IntegerList::new([1, 5, 10]).unwrap();
1405
1406        // First write
1407        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1408        assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
1409
1410        // Delete via RocksDBBatch
1411        let mut batch = provider.batch();
1412        batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
1413        batch.commit().unwrap();
1414
1415        // Verify deletion
1416        assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
1417    }
1418
1419    #[test]
1420    fn test_rocksdb_batch_delete_account_history() {
1421        let (_temp_dir, provider) = create_rocksdb_provider();
1422
1423        let address = Address::random();
1424        let key = ShardedKey::new(address, 1000);
1425        let value = IntegerList::new([1, 10, 100]).unwrap();
1426
1427        // First write
1428        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1429        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
1430
1431        // Delete via RocksDBBatch
1432        let mut batch = provider.batch();
1433        batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
1434        batch.commit().unwrap();
1435
1436        // Verify deletion
1437        assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
1438    }
1439
1440    // ==================== Parametrized Backend Equivalence Tests ====================
1441    //
1442    // These tests verify that MDBX and RocksDB produce identical results for history lookups.
1443    // Each scenario sets up the same data in both backends and asserts identical HistoryInfo.
1444
1445    /// Query parameters for a history lookup test case.
1446    struct HistoryQuery {
1447        block_number: BlockNumber,
1448        lowest_available: Option<BlockNumber>,
1449        expected: HistoryInfo,
1450    }
1451
1452    // Type aliases for cursor types (needed for EitherWriter/EitherReader type inference)
1453    type AccountsHistoryWriteCursor =
1454        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
1455    type StoragesHistoryWriteCursor =
1456        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
1457    type AccountsHistoryReadCursor =
1458        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
1459    type StoragesHistoryReadCursor =
1460        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
1461
1462    /// Runs the same account history queries against both MDBX and `RocksDB` backends,
1463    /// asserting they produce identical results.
1464    fn run_account_history_scenario(
1465        scenario_name: &str,
1466        address: Address,
1467        shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
1468        queries: &[HistoryQuery],
1469    ) {
1470        // Setup MDBX and RocksDB with identical data using EitherWriter
1471        let factory = create_test_provider_factory();
1472        let mdbx_provider = factory.database_provider_rw().unwrap();
1473        let (temp_dir, rocks_provider) = create_rocksdb_provider();
1474
1475        // Create writers for both backends
1476        let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1477            EitherWriter::Database(
1478                mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
1479            );
1480        let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1481            EitherWriter::RocksDB(rocks_provider.batch());
1482
1483        // Write identical data to both backends in a single loop
1484        for (highest_block, blocks) in shards {
1485            let key = ShardedKey::new(address, *highest_block);
1486            let value = IntegerList::new(blocks.clone()).unwrap();
1487            mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
1488            rocks_writer.upsert_account_history(key, &value).unwrap();
1489        }
1490
1491        // Commit both backends
1492        drop(mdbx_writer);
1493        mdbx_provider.commit().unwrap();
1494        if let EitherWriter::RocksDB(batch) = rocks_writer {
1495            batch.commit().unwrap();
1496        }
1497
1498        // Run queries against both backends using EitherReader
1499        let mdbx_ro = factory.database_provider_ro().unwrap();
1500        let rocks_tx = rocks_provider.tx();
1501
1502        for (i, query) in queries.iter().enumerate() {
1503            // MDBX query via EitherReader
1504            let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1505                EitherReader::Database(
1506                    mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
1507                    PhantomData,
1508                );
1509            let mdbx_result = mdbx_reader
1510                .account_history_info(address, query.block_number, query.lowest_available)
1511                .unwrap();
1512
1513            // RocksDB query via EitherReader
1514            let mut rocks_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1515                EitherReader::RocksDB(&rocks_tx);
1516            let rocks_result = rocks_reader
1517                .account_history_info(address, query.block_number, query.lowest_available)
1518                .unwrap();
1519
1520            // Assert both backends produce identical results
1521            assert_eq!(
1522                mdbx_result,
1523                rocks_result,
1524                "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1525                 MDBX: {:?}, RocksDB: {:?}",
1526                scenario_name,
1527                i,
1528                query.block_number,
1529                query.lowest_available,
1530                mdbx_result,
1531                rocks_result
1532            );
1533
1534            // Also verify against expected result
1535            assert_eq!(
1536                mdbx_result,
1537                query.expected,
1538                "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1539                 Got: {:?}, Expected: {:?}",
1540                scenario_name,
1541                i,
1542                query.block_number,
1543                query.lowest_available,
1544                mdbx_result,
1545                query.expected
1546            );
1547        }
1548
1549        rocks_tx.rollback().unwrap();
1550        drop(temp_dir);
1551    }
1552
1553    /// Runs the same storage history queries against both MDBX and `RocksDB` backends,
1554    /// asserting they produce identical results.
1555    fn run_storage_history_scenario(
1556        scenario_name: &str,
1557        address: Address,
1558        storage_key: B256,
1559        shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
1560        queries: &[HistoryQuery],
1561    ) {
1562        // Setup MDBX and RocksDB with identical data using EitherWriter
1563        let factory = create_test_provider_factory();
1564        let mdbx_provider = factory.database_provider_rw().unwrap();
1565        let (temp_dir, rocks_provider) = create_rocksdb_provider();
1566
1567        // Create writers for both backends
1568        let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1569            EitherWriter::Database(
1570                mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
1571            );
1572        let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1573            EitherWriter::RocksDB(rocks_provider.batch());
1574
1575        // Write identical data to both backends in a single loop
1576        for (highest_block, blocks) in shards {
1577            let key = StorageShardedKey::new(address, storage_key, *highest_block);
1578            let value = IntegerList::new(blocks.clone()).unwrap();
1579            mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
1580            rocks_writer.put_storage_history(key, &value).unwrap();
1581        }
1582
1583        // Commit both backends
1584        drop(mdbx_writer);
1585        mdbx_provider.commit().unwrap();
1586        if let EitherWriter::RocksDB(batch) = rocks_writer {
1587            batch.commit().unwrap();
1588        }
1589
1590        // Run queries against both backends using EitherReader
1591        let mdbx_ro = factory.database_provider_ro().unwrap();
1592        let rocks_tx = rocks_provider.tx();
1593
1594        for (i, query) in queries.iter().enumerate() {
1595            // MDBX query via EitherReader
1596            let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1597                EitherReader::Database(
1598                    mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
1599                    PhantomData,
1600                );
1601            let mdbx_result = mdbx_reader
1602                .storage_history_info(
1603                    address,
1604                    storage_key,
1605                    query.block_number,
1606                    query.lowest_available,
1607                )
1608                .unwrap();
1609
1610            // RocksDB query via EitherReader
1611            let mut rocks_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1612                EitherReader::RocksDB(&rocks_tx);
1613            let rocks_result = rocks_reader
1614                .storage_history_info(
1615                    address,
1616                    storage_key,
1617                    query.block_number,
1618                    query.lowest_available,
1619                )
1620                .unwrap();
1621
1622            // Assert both backends produce identical results
1623            assert_eq!(
1624                mdbx_result,
1625                rocks_result,
1626                "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1627                 MDBX: {:?}, RocksDB: {:?}",
1628                scenario_name,
1629                i,
1630                query.block_number,
1631                query.lowest_available,
1632                mdbx_result,
1633                rocks_result
1634            );
1635
1636            // Also verify against expected result
1637            assert_eq!(
1638                mdbx_result,
1639                query.expected,
1640                "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1641                 Got: {:?}, Expected: {:?}",
1642                scenario_name,
1643                i,
1644                query.block_number,
1645                query.lowest_available,
1646                mdbx_result,
1647                query.expected
1648            );
1649        }
1650
1651        rocks_tx.rollback().unwrap();
1652        drop(temp_dir);
1653    }
1654
1655    /// Tests account history lookups across both MDBX and `RocksDB` backends.
1656    ///
1657    /// Covers the following scenarios from PR2's `RocksDB`-only tests:
1658    /// 1. Single shard - basic lookups within one shard
1659    /// 2. Multiple shards - `prev()` shard detection and transitions
1660    /// 3. No history - query address with no entries
1661    /// 4. Pruning boundary - `lowest_available` boundary behavior (block at/after boundary)
1662    #[test]
1663    fn test_account_history_info_both_backends() {
1664        let address = Address::from([0x42; 20]);
1665
1666        // Scenario 1: Single shard with blocks [100, 200, 300]
1667        run_account_history_scenario(
1668            "single_shard",
1669            address,
1670            &[(u64::MAX, vec![100, 200, 300])],
1671            &[
1672                // Before first entry -> NotYetWritten
1673                HistoryQuery {
1674                    block_number: 50,
1675                    lowest_available: None,
1676                    expected: HistoryInfo::NotYetWritten,
1677                },
1678                // Between entries -> InChangeset(next_write)
1679                HistoryQuery {
1680                    block_number: 150,
1681                    lowest_available: None,
1682                    expected: HistoryInfo::InChangeset(200),
1683                },
1684                // Exact match on entry -> InChangeset(same_block)
1685                HistoryQuery {
1686                    block_number: 300,
1687                    lowest_available: None,
1688                    expected: HistoryInfo::InChangeset(300),
1689                },
1690                // After last entry in last shard -> InPlainState
1691                HistoryQuery {
1692                    block_number: 500,
1693                    lowest_available: None,
1694                    expected: HistoryInfo::InPlainState,
1695                },
1696            ],
1697        );
1698
1699        // Scenario 2: Multiple shards - tests prev() shard detection
1700        run_account_history_scenario(
1701            "multiple_shards",
1702            address,
1703            &[
1704                (500, vec![100, 200, 300, 400, 500]), // First shard ends at 500
1705                (u64::MAX, vec![600, 700, 800]),      // Last shard
1706            ],
1707            &[
1708                // Before first shard, no prev -> NotYetWritten
1709                HistoryQuery {
1710                    block_number: 50,
1711                    lowest_available: None,
1712                    expected: HistoryInfo::NotYetWritten,
1713                },
1714                // Within first shard
1715                HistoryQuery {
1716                    block_number: 150,
1717                    lowest_available: None,
1718                    expected: HistoryInfo::InChangeset(200),
1719                },
1720                // Between shards - prev() should find first shard
1721                HistoryQuery {
1722                    block_number: 550,
1723                    lowest_available: None,
1724                    expected: HistoryInfo::InChangeset(600),
1725                },
1726                // After all entries
1727                HistoryQuery {
1728                    block_number: 900,
1729                    lowest_available: None,
1730                    expected: HistoryInfo::InPlainState,
1731                },
1732            ],
1733        );
1734
1735        // Scenario 3: No history for address
1736        let address_without_history = Address::from([0x43; 20]);
1737        run_account_history_scenario(
1738            "no_history",
1739            address_without_history,
1740            &[], // No shards for this address
1741            &[HistoryQuery {
1742                block_number: 150,
1743                lowest_available: None,
1744                expected: HistoryInfo::NotYetWritten,
1745            }],
1746        );
1747
1748        // Scenario 4: Query at pruning boundary
1749        // Note: We test block >= lowest_available because HistoricalStateProviderRef
1750        // errors on blocks below the pruning boundary before doing the lookup.
1751        // The RocksDB implementation doesn't have this check at the same level.
1752        // This tests that when pruning IS available, both backends agree.
1753        run_account_history_scenario(
1754            "with_pruning_boundary",
1755            address,
1756            &[(u64::MAX, vec![100, 200, 300])],
1757            &[
1758                // At pruning boundary -> InChangeset(first entry after block)
1759                HistoryQuery {
1760                    block_number: 100,
1761                    lowest_available: Some(100),
1762                    expected: HistoryInfo::InChangeset(100),
1763                },
1764                // After pruning boundary, between entries
1765                HistoryQuery {
1766                    block_number: 150,
1767                    lowest_available: Some(100),
1768                    expected: HistoryInfo::InChangeset(200),
1769                },
1770            ],
1771        );
1772    }
1773
1774    /// Tests storage history lookups across both MDBX and `RocksDB` backends.
1775    #[test]
1776    fn test_storage_history_info_both_backends() {
1777        let address = Address::from([0x42; 20]);
1778        let storage_key = B256::from([0x01; 32]);
1779        let other_storage_key = B256::from([0x02; 32]);
1780
1781        // Single shard with blocks [100, 200, 300]
1782        run_storage_history_scenario(
1783            "storage_single_shard",
1784            address,
1785            storage_key,
1786            &[(u64::MAX, vec![100, 200, 300])],
1787            &[
1788                // Before first entry -> NotYetWritten
1789                HistoryQuery {
1790                    block_number: 50,
1791                    lowest_available: None,
1792                    expected: HistoryInfo::NotYetWritten,
1793                },
1794                // Between entries -> InChangeset(next_write)
1795                HistoryQuery {
1796                    block_number: 150,
1797                    lowest_available: None,
1798                    expected: HistoryInfo::InChangeset(200),
1799                },
1800                // After last entry -> InPlainState
1801                HistoryQuery {
1802                    block_number: 500,
1803                    lowest_available: None,
1804                    expected: HistoryInfo::InPlainState,
1805                },
1806            ],
1807        );
1808
1809        // No history for different storage key
1810        run_storage_history_scenario(
1811            "storage_no_history",
1812            address,
1813            other_storage_key,
1814            &[], // No shards for this storage key
1815            &[HistoryQuery {
1816                block_number: 150,
1817                lowest_available: None,
1818                expected: HistoryInfo::NotYetWritten,
1819            }],
1820        );
1821    }
1822
1823    /// Test that `RocksDB` batches created via `EitherWriter` are only made visible when
1824    /// `provider.commit()` is called, not when the writer is dropped.
1825    #[test]
1826    fn test_rocksdb_commits_at_provider_level() {
1827        let factory = create_test_provider_factory();
1828
1829        // Enable RocksDB for transaction hash numbers
1830        factory.set_storage_settings_cache(StorageSettings::v2());
1831
1832        let hash1 = B256::from([1u8; 32]);
1833        let hash2 = B256::from([2u8; 32]);
1834        let tx_num1 = 100u64;
1835        let tx_num2 = 200u64;
1836
1837        // Get the RocksDB batch from the provider
1838        let rocksdb = factory.rocksdb_provider();
1839        let batch = rocksdb.batch();
1840
1841        // Create provider and EitherWriter
1842        let provider = factory.database_provider_rw().unwrap();
1843        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1844
1845        // Write transaction hash numbers (append_only=false since we're using RocksDB)
1846        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1847        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1848
1849        // Extract the raw batch from the writer and register it with the provider
1850        let raw_batch = writer.into_raw_rocksdb_batch();
1851        if let Some(batch) = raw_batch {
1852            provider.set_pending_rocksdb_batch(batch);
1853        }
1854
1855        // Data should NOT be visible yet (batch not committed)
1856        let rocksdb = factory.rocksdb_provider();
1857        assert_eq!(
1858            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1859            None,
1860            "Data should not be visible before provider.commit()"
1861        );
1862
1863        // Commit the provider - this should commit both MDBX and RocksDB
1864        provider.commit().unwrap();
1865
1866        // Now data should be visible in RocksDB
1867        let rocksdb = factory.rocksdb_provider();
1868        assert_eq!(
1869            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1870            Some(tx_num1),
1871            "Data should be visible after provider.commit()"
1872        );
1873        assert_eq!(
1874            rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
1875            Some(tx_num2),
1876            "Data should be visible after provider.commit()"
1877        );
1878    }
1879
1880    /// Test that `EitherReader::new_accounts_history` panics when settings require
1881    /// `RocksDB` but no tx is provided (`None`). This is an invariant violation that
1882    /// indicates a bug - `with_rocksdb_tx` should always provide a tx when needed.
1883    #[test]
1884    #[should_panic(expected = "account_history_in_rocksdb requires rocksdb tx")]
1885    fn test_settings_mismatch_panics() {
1886        let factory = create_test_provider_factory();
1887
1888        factory.set_storage_settings_cache(StorageSettings::v2());
1889
1890        let provider = factory.database_provider_ro().unwrap();
1891        let _ = EitherReader::<(), ()>::new_accounts_history(&provider, None);
1892    }
1893}