Skip to main content

reth_provider/
either_writer.rs

1//! Generic reader and writer abstractions for interacting with either database tables or static
2//! files.
3
4use std::{
5    collections::BTreeSet,
6    marker::PhantomData,
7    ops::{Range, RangeInclusive},
8};
9
10use crate::{
11    providers::{
12        history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
13        StaticFileProviderRWRefMut,
14    },
15    StaticFileProviderFactory,
16};
17use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
18use rayon::slice::ParallelSliceMut;
19use reth_db::{
20    cursor::{DbCursorRO, DbDupCursorRW},
21    models::{AccountBeforeTx, StorageBeforeTx},
22    static_file::TransactionSenderMask,
23    table::Value,
24    transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut, DupCursorMutTy, DupCursorTy},
25};
26use reth_db_api::{
27    cursor::DbCursorRW,
28    models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
29    tables,
30    tables::BlockNumberList,
31};
32use reth_errors::ProviderError;
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{ReceiptTy, StorageEntry};
35use reth_static_file_types::StaticFileSegment;
36use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, StorageSettingsCache};
37use reth_storage_errors::provider::ProviderResult;
38use strum::{Display, EnumIs};
39
40/// Type alias for [`EitherReader`] constructors.
41type EitherReaderTy<'a, P, T> =
42    EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
43
44/// Type alias for [`EitherReader`] constructors.
45type DupEitherReaderTy<'a, P, T> = EitherReader<
46    'a,
47    DupCursorTy<<P as DBProvider>::Tx, T>,
48    <P as NodePrimitivesProvider>::Primitives,
49>;
50
51/// Type alias for dup [`EitherWriter`] constructors.
52type DupEitherWriterTy<'a, P, T> = EitherWriter<
53    'a,
54    DupCursorMutTy<<P as DBProvider>::Tx, T>,
55    <P as NodePrimitivesProvider>::Primitives,
56>;
57
58/// Type alias for [`EitherWriter`] constructors.
59type EitherWriterTy<'a, P, T> = EitherWriter<
60    'a,
61    CursorMutTy<<P as DBProvider>::Tx, T>,
62    <P as NodePrimitivesProvider>::Primitives,
63>;
64
65/// Helper type for `RocksDB` batch argument in writer constructors.
66pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
67
68/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
69pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
70
71/// Helper type for `RocksDB` snapshot argument in reader constructors.
72///
73/// The `Option` allows callers to skip `RocksDB` access when it isn't needed
74/// (e.g., on legacy MDBX-only nodes).
75pub type RocksDBRefArg<'a> = Option<crate::providers::rocksdb::RocksReadSnapshot<'a>>;
76
77/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
78#[derive(Debug, Display)]
79pub enum EitherWriter<'a, CURSOR, N> {
80    /// Write to database table via cursor
81    Database(CURSOR),
82    /// Write to static file
83    StaticFile(StaticFileProviderRWRefMut<'a, N>),
84    /// Write to `RocksDB` using a write-only batch (historical tables).
85    RocksDB(RocksDBBatch<'a>),
86}
87
88impl<'a> EitherWriter<'a, (), ()> {
89    /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes.
90    pub fn new_receipts<P>(
91        provider: &'a P,
92        block_number: BlockNumber,
93    ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
94    where
95        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
96        P::Tx: DbTxMut,
97        ReceiptTy<P::Primitives>: Value,
98    {
99        if Self::receipts_destination(provider).is_static_file() {
100            Ok(EitherWriter::StaticFile(
101                provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
102            ))
103        } else {
104            Ok(EitherWriter::Database(
105                provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
106            ))
107        }
108    }
109
110    /// Creates a new [`EitherWriter`] for senders based on storage settings.
111    pub fn new_senders<P>(
112        provider: &'a P,
113        block_number: BlockNumber,
114    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
115    where
116        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
117        P::Tx: DbTxMut,
118    {
119        if EitherWriterDestination::senders(provider).is_static_file() {
120            Ok(EitherWriter::StaticFile(
121                provider
122                    .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
123            ))
124        } else {
125            Ok(EitherWriter::Database(
126                provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
127            ))
128        }
129    }
130
131    /// Creates a new [`EitherWriter`] for account changesets based on storage settings and prune
132    /// modes.
133    pub fn new_account_changesets<P>(
134        provider: &'a P,
135        block_number: BlockNumber,
136    ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::AccountChangeSets>>
137    where
138        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
139        P::Tx: DbTxMut,
140    {
141        if provider.cached_storage_settings().storage_v2 {
142            Ok(EitherWriter::StaticFile(
143                provider
144                    .get_static_file_writer(block_number, StaticFileSegment::AccountChangeSets)?,
145            ))
146        } else {
147            Ok(EitherWriter::Database(
148                provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?,
149            ))
150        }
151    }
152
153    /// Creates a new [`EitherWriter`] for storage changesets based on storage settings.
154    pub fn new_storage_changesets<P>(
155        provider: &'a P,
156        block_number: BlockNumber,
157    ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::StorageChangeSets>>
158    where
159        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
160        P::Tx: DbTxMut,
161    {
162        if provider.cached_storage_settings().storage_v2 {
163            Ok(EitherWriter::StaticFile(
164                provider
165                    .get_static_file_writer(block_number, StaticFileSegment::StorageChangeSets)?,
166            ))
167        } else {
168            Ok(EitherWriter::Database(
169                provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?,
170            ))
171        }
172    }
173
174    /// Returns the destination for writing receipts.
175    ///
176    /// The rules are as follows:
177    /// - If the node should not always write receipts to static files, and any receipt pruning is
178    ///   enabled, write to the database.
179    /// - If the node should always write receipts to static files, but receipt log filter pruning
180    ///   is enabled, write to the database.
181    /// - Otherwise, write to static files.
182    pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
183        provider: &P,
184    ) -> EitherWriterDestination {
185        let receipts_in_static_files = provider.cached_storage_settings().storage_v2;
186        let prune_modes = provider.prune_modes_ref();
187
188        if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
189            // TODO: support writing receipts to static files with log filter pruning enabled
190            receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
191        {
192            EitherWriterDestination::Database
193        } else {
194            EitherWriterDestination::StaticFile
195        }
196    }
197
198    /// Returns the destination for writing account changesets.
199    ///
200    /// This determines the destination based solely on storage settings.
201    pub fn account_changesets_destination<P: DBProvider + StorageSettingsCache>(
202        provider: &P,
203    ) -> EitherWriterDestination {
204        if provider.cached_storage_settings().storage_v2 {
205            EitherWriterDestination::StaticFile
206        } else {
207            EitherWriterDestination::Database
208        }
209    }
210
211    /// Returns the destination for writing storage changesets.
212    ///
213    /// This determines the destination based solely on storage settings.
214    pub fn storage_changesets_destination<P: DBProvider + StorageSettingsCache>(
215        provider: &P,
216    ) -> EitherWriterDestination {
217        if provider.cached_storage_settings().storage_v2 {
218            EitherWriterDestination::StaticFile
219        } else {
220            EitherWriterDestination::Database
221        }
222    }
223
224    /// Creates a new [`EitherWriter`] for storages history based on storage settings.
225    pub fn new_storages_history<P>(
226        provider: &P,
227        _rocksdb_batch: RocksBatchArg<'a>,
228    ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
229    where
230        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
231        P::Tx: DbTxMut,
232    {
233        if provider.cached_storage_settings().storage_v2 {
234            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
235        }
236
237        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
238    }
239
240    /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
241    pub fn new_transaction_hash_numbers<P>(
242        provider: &P,
243        _rocksdb_batch: RocksBatchArg<'a>,
244    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
245    where
246        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
247        P::Tx: DbTxMut,
248    {
249        if provider.cached_storage_settings().storage_v2 {
250            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
251        }
252
253        Ok(EitherWriter::Database(
254            provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
255        ))
256    }
257
258    /// Creates a new [`EitherWriter`] for account history based on storage settings.
259    pub fn new_accounts_history<P>(
260        provider: &P,
261        _rocksdb_batch: RocksBatchArg<'a>,
262    ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
263    where
264        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
265        P::Tx: DbTxMut,
266    {
267        if provider.cached_storage_settings().storage_v2 {
268            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
269        }
270
271        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
272    }
273}
274
275impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
276    /// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
277    ///
278    /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
279    /// `None` for other variants.
280    ///
281    /// This is used to defer `RocksDB` commits to the provider level, ensuring all
282    /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
283    pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
284        match self {
285            Self::Database(_) | Self::StaticFile(_) => None,
286            Self::RocksDB(batch) => Some(batch.into_inner()),
287        }
288    }
289
290    /// Increment the block number.
291    ///
292    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
293    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
294        match self {
295            Self::Database(_) => Ok(()),
296            Self::StaticFile(writer) => writer.increment_block(expected_block_number),
297            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
298        }
299    }
300
301    /// Ensures that the writer is positioned at the specified block number.
302    ///
303    /// If the writer is positioned at a greater block number than the specified one, the writer
304    /// will NOT be unwound and the error will be returned.
305    ///
306    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
307    pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
308        match self {
309            Self::Database(_) => Ok(()),
310            Self::StaticFile(writer) => writer.ensure_at_block(block_number),
311            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
312        }
313    }
314}
315
316impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
317where
318    N::Receipt: Value,
319    CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
320{
321    /// Append a transaction receipt.
322    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
323        match self {
324            Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
325            Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
326            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
327        }
328    }
329}
330
331impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
332where
333    CURSOR: DbCursorRW<tables::TransactionSenders>,
334{
335    /// Append a transaction sender to the destination
336    pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
337        match self {
338            Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
339            Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
340            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
341        }
342    }
343
344    /// Append transaction senders to the destination
345    pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
346    where
347        I: Iterator<Item = (TxNumber, Address)>,
348    {
349        match self {
350            Self::Database(cursor) => {
351                for (tx_num, sender) in senders {
352                    cursor.append(tx_num, &sender)?;
353                }
354                Ok(())
355            }
356            Self::StaticFile(writer) => writer.append_transaction_senders(senders),
357            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
358        }
359    }
360
361    /// Removes all transaction senders above the given transaction number, and stops at the given
362    /// block number.
363    pub fn prune_senders(
364        &mut self,
365        unwind_tx_from: TxNumber,
366        block: BlockNumber,
367    ) -> ProviderResult<()>
368    where
369        CURSOR: DbCursorRO<tables::TransactionSenders>,
370    {
371        match self {
372            Self::Database(cursor) => {
373                let mut walker = cursor.walk_range(unwind_tx_from..)?;
374                while walker.next().transpose()?.is_some() {
375                    walker.delete_current()?;
376                }
377            }
378            Self::StaticFile(writer) => {
379                let static_file_transaction_sender_num = writer
380                    .reader()
381                    .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
382
383                let to_delete = static_file_transaction_sender_num
384                    .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
385                    .unwrap_or_default();
386
387                writer.prune_transaction_senders(to_delete, block)?;
388            }
389            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
390        }
391
392        Ok(())
393    }
394}
395
396impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
397where
398    CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
399{
400    /// Puts a transaction hash number mapping.
401    ///
402    /// When `append_only` is true, uses `cursor.append()` which is significantly faster
403    /// but requires entries to be inserted in order and the table to be empty.
404    /// When false, uses `cursor.upsert()` which handles arbitrary insertion order and duplicates.
405    pub fn put_transaction_hash_number(
406        &mut self,
407        hash: TxHash,
408        tx_num: TxNumber,
409        append_only: bool,
410    ) -> ProviderResult<()> {
411        match self {
412            Self::Database(cursor) => {
413                if append_only {
414                    Ok(cursor.append(hash, &tx_num)?)
415                } else {
416                    Ok(cursor.upsert(hash, &tx_num)?)
417                }
418            }
419            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
420            Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
421        }
422    }
423
424    /// Puts multiple transaction hash number mappings in a batch.
425    ///
426    /// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor.
427    /// This is more efficient than calling `put_transaction_hash_number` repeatedly.
428    ///
429    /// When `append_only` is true, uses `cursor.append()` which requires entries to be
430    /// pre-sorted and the table to be empty or have only lower keys.
431    /// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
432    pub fn put_transaction_hash_numbers_batch(
433        &mut self,
434        entries: Vec<(TxHash, TxNumber)>,
435        append_only: bool,
436    ) -> ProviderResult<()> {
437        match self {
438            Self::Database(cursor) => {
439                for (hash, tx_num) in entries {
440                    if append_only {
441                        cursor.append(hash, &tx_num)?;
442                    } else {
443                        cursor.upsert(hash, &tx_num)?;
444                    }
445                }
446                Ok(())
447            }
448            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
449            Self::RocksDB(batch) => {
450                for (hash, tx_num) in entries {
451                    batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
452                }
453                Ok(())
454            }
455        }
456    }
457
458    /// Deletes a transaction hash number mapping.
459    pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
460        match self {
461            Self::Database(cursor) => {
462                if cursor.seek_exact(hash)?.is_some() {
463                    cursor.delete_current()?;
464                }
465                Ok(())
466            }
467            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
468            Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
469        }
470    }
471}
472
473impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
474where
475    CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
476{
477    /// Puts a storage history entry.
478    pub fn put_storage_history(
479        &mut self,
480        key: StorageShardedKey,
481        value: &BlockNumberList,
482    ) -> ProviderResult<()> {
483        match self {
484            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
485            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
486            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
487        }
488    }
489
490    /// Deletes a storage history entry.
491    pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
492        match self {
493            Self::Database(cursor) => {
494                if cursor.seek_exact(key)?.is_some() {
495                    cursor.delete_current()?;
496                }
497                Ok(())
498            }
499            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
500            Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
501        }
502    }
503
504    /// Appends a storage history entry (for first sync - more efficient).
505    pub fn append_storage_history(
506        &mut self,
507        key: StorageShardedKey,
508        value: &BlockNumberList,
509    ) -> ProviderResult<()> {
510        match self {
511            Self::Database(cursor) => Ok(cursor.append(key, value)?),
512            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
513            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
514        }
515    }
516
517    /// Upserts a storage history entry (for incremental sync).
518    pub fn upsert_storage_history(
519        &mut self,
520        key: StorageShardedKey,
521        value: &BlockNumberList,
522    ) -> ProviderResult<()> {
523        match self {
524            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
525            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
526            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
527        }
528    }
529
530    /// Gets the last shard for an address and storage key (keyed with `u64::MAX`).
531    pub fn get_last_storage_history_shard(
532        &mut self,
533        address: Address,
534        storage_key: B256,
535    ) -> ProviderResult<Option<BlockNumberList>> {
536        let key = StorageShardedKey::last(address, storage_key);
537        match self {
538            Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
539            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
540            Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
541        }
542    }
543}
544
545impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
546where
547    CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
548{
549    /// Appends an account history entry (for first sync - more efficient).
550    pub fn append_account_history(
551        &mut self,
552        key: ShardedKey<Address>,
553        value: &BlockNumberList,
554    ) -> ProviderResult<()> {
555        match self {
556            Self::Database(cursor) => Ok(cursor.append(key, value)?),
557            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
558            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
559        }
560    }
561
562    /// Upserts an account history entry (for incremental sync).
563    pub fn upsert_account_history(
564        &mut self,
565        key: ShardedKey<Address>,
566        value: &BlockNumberList,
567    ) -> ProviderResult<()> {
568        match self {
569            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
570            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
571            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
572        }
573    }
574
575    /// Gets the last shard for an address (keyed with `u64::MAX`).
576    pub fn get_last_account_history_shard(
577        &mut self,
578        address: Address,
579    ) -> ProviderResult<Option<BlockNumberList>> {
580        match self {
581            Self::Database(cursor) => {
582                Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
583            }
584            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
585            Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
586        }
587    }
588
589    /// Deletes an account history entry.
590    pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
591        match self {
592            Self::Database(cursor) => {
593                if cursor.seek_exact(key)?.is_some() {
594                    cursor.delete_current()?;
595                }
596                Ok(())
597            }
598            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
599            Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
600        }
601    }
602}
603
604impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
605where
606    CURSOR: DbDupCursorRW<tables::AccountChangeSets>,
607{
608    /// Append account changeset for a block.
609    ///
610    /// NOTE: This _sorts_ the changesets by address before appending
611    pub fn append_account_changeset(
612        &mut self,
613        block_number: BlockNumber,
614        mut changeset: Vec<AccountBeforeTx>,
615    ) -> ProviderResult<()> {
616        // First sort the changesets
617        changeset.par_sort_by_key(|a| a.address);
618        match self {
619            Self::Database(cursor) => {
620                for change in changeset {
621                    cursor.append_dup(block_number, change)?;
622                }
623            }
624            Self::StaticFile(writer) => {
625                writer.append_account_changeset(changeset, block_number)?;
626            }
627            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
628        }
629
630        Ok(())
631    }
632}
633
634impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
635where
636    CURSOR: DbDupCursorRW<tables::StorageChangeSets>,
637{
638    /// Append storage changeset for a block.
639    ///
640    /// NOTE: This _sorts_ the changesets by address and storage key before appending.
641    pub fn append_storage_changeset(
642        &mut self,
643        block_number: BlockNumber,
644        mut changeset: Vec<StorageBeforeTx>,
645    ) -> ProviderResult<()> {
646        changeset.par_sort_by_key(|change| (change.address, change.key));
647
648        match self {
649            Self::Database(cursor) => {
650                for change in changeset {
651                    let storage_id = BlockNumberAddress((block_number, change.address));
652                    cursor.append_dup(
653                        storage_id,
654                        StorageEntry { key: change.key, value: change.value },
655                    )?;
656                }
657            }
658            Self::StaticFile(writer) => {
659                writer.append_storage_changeset(changeset, block_number)?;
660            }
661            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
662        }
663
664        Ok(())
665    }
666}
667
668/// Represents a source for reading data, either from database, static files, or `RocksDB`.
669#[derive(Debug, Display)]
670pub enum EitherReader<'a, CURSOR, N> {
671    /// Read from database table via cursor
672    Database(CURSOR, PhantomData<&'a ()>),
673    /// Read from static file
674    StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
675    /// Read from `RocksDB` snapshot (works in both read-only and read-write modes)
676    RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>),
677}
678
679impl<'a> EitherReader<'a, (), ()> {
680    /// Creates a new [`EitherReader`] for senders based on storage settings.
681    pub fn new_senders<P>(
682        provider: &P,
683    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
684    where
685        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
686        P::Tx: DbTx,
687    {
688        if EitherWriterDestination::senders(provider).is_static_file() {
689            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
690        } else {
691            Ok(EitherReader::Database(
692                provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
693                PhantomData,
694            ))
695        }
696    }
697
698    /// Creates a new [`EitherReader`] for storages history based on storage settings.
699    pub fn new_storages_history<P>(
700        provider: &P,
701        rocksdb: RocksDBRefArg<'a>,
702    ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
703    where
704        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
705        P::Tx: DbTx,
706    {
707        if provider.cached_storage_settings().storage_v2 {
708            return Ok(EitherReader::RocksDB(
709                rocksdb.expect("storages_history_in_rocksdb requires rocksdb snapshot"),
710            ));
711        }
712
713        Ok(EitherReader::Database(
714            provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
715            PhantomData,
716        ))
717    }
718
719    /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
720    pub fn new_transaction_hash_numbers<P>(
721        provider: &P,
722        rocksdb: RocksDBRefArg<'a>,
723    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
724    where
725        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
726        P::Tx: DbTx,
727    {
728        if provider.cached_storage_settings().storage_v2 {
729            return Ok(EitherReader::RocksDB(
730                rocksdb.expect("transaction_hash_numbers_in_rocksdb requires rocksdb snapshot"),
731            ));
732        }
733
734        Ok(EitherReader::Database(
735            provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
736            PhantomData,
737        ))
738    }
739
740    /// Creates a new [`EitherReader`] for account history based on storage settings.
741    pub fn new_accounts_history<P>(
742        provider: &P,
743        rocksdb: RocksDBRefArg<'a>,
744    ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
745    where
746        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
747        P::Tx: DbTx,
748    {
749        if provider.cached_storage_settings().storage_v2 {
750            return Ok(EitherReader::RocksDB(
751                rocksdb.expect("account_history_in_rocksdb requires rocksdb snapshot"),
752            ));
753        }
754
755        Ok(EitherReader::Database(
756            provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
757            PhantomData,
758        ))
759    }
760
761    /// Creates a new [`EitherReader`] for account changesets based on storage settings.
762    pub fn new_account_changesets<P>(
763        provider: &P,
764    ) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
765    where
766        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
767        P::Tx: DbTx,
768    {
769        if EitherWriterDestination::account_changesets(provider).is_static_file() {
770            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
771        } else {
772            Ok(EitherReader::Database(
773                provider.tx_ref().cursor_dup_read::<tables::AccountChangeSets>()?,
774                PhantomData,
775            ))
776        }
777    }
778}
779
780impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
781where
782    CURSOR: DbCursorRO<tables::TransactionSenders>,
783{
784    /// Fetches the senders for a range of transactions.
785    pub fn senders_by_tx_range(
786        &mut self,
787        range: Range<TxNumber>,
788    ) -> ProviderResult<HashMap<TxNumber, Address>> {
789        match self {
790            Self::Database(cursor, _) => cursor
791                .walk_range(range)?
792                .map(|result| result.map_err(ProviderError::from))
793                .collect::<ProviderResult<HashMap<_, _>>>(),
794            Self::StaticFile(provider, _) => range
795                .clone()
796                .zip(provider.fetch_range_iter(
797                    StaticFileSegment::TransactionSenders,
798                    range,
799                    |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
800                )?)
801                .filter_map(|(tx_num, sender)| {
802                    let result = sender.transpose()?;
803                    Some(result.map(|sender| (tx_num, sender)))
804                })
805                .collect::<ProviderResult<HashMap<_, _>>>(),
806            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
807        }
808    }
809}
810
811impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
812where
813    CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
814{
815    /// Gets a transaction number by its hash.
816    pub fn get_transaction_hash_number(
817        &mut self,
818        hash: TxHash,
819    ) -> ProviderResult<Option<TxNumber>> {
820        match self {
821            Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
822            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
823            Self::RocksDB(snapshot) => snapshot.get::<tables::TransactionHashNumbers>(hash),
824        }
825    }
826}
827
828impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
829where
830    CURSOR: DbCursorRO<tables::StoragesHistory>,
831{
832    /// Gets a storage history shard entry for the given [`StorageShardedKey`], if present.
833    pub fn get_storage_history(
834        &mut self,
835        key: StorageShardedKey,
836    ) -> ProviderResult<Option<BlockNumberList>> {
837        match self {
838            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
839            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
840            Self::RocksDB(snapshot) => snapshot.get::<tables::StoragesHistory>(key),
841        }
842    }
843
844    /// Lookup storage history and return [`HistoryInfo`].
845    pub fn storage_history_info(
846        &mut self,
847        address: Address,
848        storage_key: alloy_primitives::B256,
849        block_number: BlockNumber,
850        lowest_available_block_number: Option<BlockNumber>,
851    ) -> ProviderResult<HistoryInfo> {
852        match self {
853            Self::Database(cursor, _) => {
854                let key = StorageShardedKey::new(address, storage_key, block_number);
855                history_info::<tables::StoragesHistory, _, _>(
856                    cursor,
857                    key,
858                    block_number,
859                    |k| k.address == address && k.sharded_key.key == storage_key,
860                    lowest_available_block_number,
861                )
862            }
863            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
864            Self::RocksDB(snapshot) => snapshot.storage_history_info(
865                address,
866                storage_key,
867                block_number,
868                lowest_available_block_number,
869            ),
870        }
871    }
872}
873
874impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
875where
876    CURSOR: DbCursorRO<tables::AccountsHistory>,
877{
878    /// Gets an account history shard entry for the given [`ShardedKey`], if present.
879    pub fn get_account_history(
880        &mut self,
881        key: ShardedKey<Address>,
882    ) -> ProviderResult<Option<BlockNumberList>> {
883        match self {
884            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
885            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
886            Self::RocksDB(snapshot) => snapshot.get::<tables::AccountsHistory>(key),
887        }
888    }
889
890    /// Lookup account history and return [`HistoryInfo`].
891    pub fn account_history_info(
892        &mut self,
893        address: Address,
894        block_number: BlockNumber,
895        lowest_available_block_number: Option<BlockNumber>,
896    ) -> ProviderResult<HistoryInfo> {
897        match self {
898            Self::Database(cursor, _) => {
899                let key = ShardedKey::new(address, block_number);
900                history_info::<tables::AccountsHistory, _, _>(
901                    cursor,
902                    key,
903                    block_number,
904                    |k| k.key == address,
905                    lowest_available_block_number,
906                )
907            }
908            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
909            Self::RocksDB(snapshot) => {
910                snapshot.account_history_info(address, block_number, lowest_available_block_number)
911            }
912        }
913    }
914}
915
916impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
917where
918    CURSOR: DbCursorRO<tables::AccountChangeSets>,
919{
920    /// Iterate over account changesets and return all account address that were changed.
921    pub fn changed_accounts_with_range(
922        &mut self,
923        range: RangeInclusive<BlockNumber>,
924    ) -> ProviderResult<BTreeSet<Address>> {
925        match self {
926            Self::StaticFile(provider, _) => {
927                let highest_static_block =
928                    provider.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
929
930                let Some(highest) = highest_static_block else {
931                    return Err(ProviderError::MissingHighestStaticFileBlock(
932                        StaticFileSegment::AccountChangeSets,
933                    ))
934                };
935
936                let start = *range.start();
937                let static_end = (*range.end()).min(highest);
938
939                let mut changed_accounts = BTreeSet::default();
940                if start <= static_end {
941                    for block in start..=static_end {
942                        let block_changesets = provider.account_block_changeset(block)?;
943                        for changeset in block_changesets {
944                            changed_accounts.insert(changeset.address);
945                        }
946                    }
947                }
948
949                Ok(changed_accounts)
950            }
951            Self::Database(provider, _) => provider
952                .walk_range(range)?
953                .map(|entry| {
954                    entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
955                })
956                .collect(),
957            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
958        }
959    }
960}
961
962/// Destination for writing data.
963#[derive(Debug, EnumIs)]
964pub enum EitherWriterDestination {
965    /// Write to database table
966    Database,
967    /// Write to static file
968    StaticFile,
969    /// Write to `RocksDB`
970    RocksDB,
971}
972
973impl EitherWriterDestination {
974    /// Returns the destination for writing senders based on storage settings.
975    pub fn senders<P>(provider: &P) -> Self
976    where
977        P: StorageSettingsCache,
978    {
979        // Write senders to static files only if they're explicitly enabled
980        if provider.cached_storage_settings().storage_v2 {
981            Self::StaticFile
982        } else {
983            Self::Database
984        }
985    }
986
987    /// Returns the destination for writing account changesets based on storage settings.
988    pub fn account_changesets<P>(provider: &P) -> Self
989    where
990        P: StorageSettingsCache,
991    {
992        // Write account changesets to static files only if they're explicitly enabled
993        if provider.cached_storage_settings().storage_v2 {
994            Self::StaticFile
995        } else {
996            Self::Database
997        }
998    }
999
1000    /// Returns the destination for writing storage changesets based on storage settings.
1001    pub fn storage_changesets<P>(provider: &P) -> Self
1002    where
1003        P: StorageSettingsCache,
1004    {
1005        // Write storage changesets to static files only if they're explicitly enabled
1006        if provider.cached_storage_settings().storage_v2 {
1007            Self::StaticFile
1008        } else {
1009            Self::Database
1010        }
1011    }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use crate::{test_utils::create_test_provider_factory, StaticFileWriter};
1017
1018    use super::*;
1019    use alloy_primitives::Address;
1020    use reth_db::models::AccountBeforeTx;
1021    use reth_static_file_types::StaticFileSegment;
1022    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1023
1024    /// Verifies that `changed_accounts_with_range` correctly caps the query range to the
1025    /// static file tip when the requested range extends beyond it.
1026    ///
1027    /// This test documents the fix for an off-by-one bug where the code computed
1028    /// `static_end = range.end().min(highest + 1)` instead of `min(highest)`.
1029    /// The bug allowed iteration to attempt reading block `highest + 1` which doesn't
1030    /// exist (silently returning empty results due to `MissingStaticFileBlock` handling).
1031    ///
1032    /// While the bug was masked by error handling, it caused:
1033    /// 1. Unnecessary iteration/lookup for non-existent blocks
1034    /// 2. Potential overflow when `highest == u64::MAX`
1035    #[test]
1036    fn test_changed_accounts_with_range_caps_at_static_file_tip() {
1037        let factory = create_test_provider_factory();
1038        let highest_block = 5u64;
1039
1040        let addresses: Vec<Address> = (0..=highest_block)
1041            .map(|i| {
1042                let mut addr = Address::ZERO;
1043                addr.0[0] = i as u8;
1044                addr
1045            })
1046            .collect();
1047
1048        {
1049            let sf_provider = factory.static_file_provider();
1050            let mut writer =
1051                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1052
1053            for block_num in 0..=highest_block {
1054                let changeset =
1055                    vec![AccountBeforeTx { address: addresses[block_num as usize], info: None }];
1056                writer.append_account_changeset(changeset, block_num).unwrap();
1057            }
1058            writer.commit().unwrap();
1059        }
1060
1061        factory.set_storage_settings_cache(StorageSettings::v2());
1062
1063        let provider = factory.database_provider_ro().unwrap();
1064
1065        let sf_tip = provider
1066            .static_file_provider()
1067            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1068        assert_eq!(sf_tip, Some(highest_block));
1069
1070        let mut reader = EitherReader::new_account_changesets(&provider).unwrap();
1071        assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1072
1073        // Query range 0..=10 when tip is 5 - should return only accounts from blocks 0-5
1074        let result = reader.changed_accounts_with_range(0..=10).unwrap();
1075
1076        let expected: BTreeSet<Address> = addresses.into_iter().collect();
1077        assert_eq!(result, expected);
1078    }
1079
1080    #[test]
1081    fn test_reader_senders_by_tx_range() {
1082        let factory = create_test_provider_factory();
1083
1084        // Insert senders only from 1 to 4, but we will query from 0 to 5.
1085        let senders = [
1086            (1, Address::random()),
1087            (2, Address::random()),
1088            (3, Address::random()),
1089            (4, Address::random()),
1090        ];
1091
1092        for transaction_senders_in_static_files in [false, true] {
1093            factory.set_storage_settings_cache(if transaction_senders_in_static_files {
1094                StorageSettings::v2()
1095            } else {
1096                StorageSettings::v1()
1097            });
1098
1099            let provider = factory.database_provider_rw().unwrap();
1100            let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
1101            if transaction_senders_in_static_files {
1102                assert!(matches!(writer, EitherWriter::StaticFile(_)));
1103            } else {
1104                assert!(matches!(writer, EitherWriter::Database(_)));
1105            }
1106
1107            writer.increment_block(0).unwrap();
1108            writer.append_senders(senders.iter().copied()).unwrap();
1109            drop(writer);
1110            provider.commit().unwrap();
1111
1112            let provider = factory.database_provider_ro().unwrap();
1113            let mut reader = EitherReader::new_senders(&provider).unwrap();
1114            if transaction_senders_in_static_files {
1115                assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1116            } else {
1117                assert!(matches!(reader, EitherReader::Database(_, _)));
1118            }
1119
1120            assert_eq!(
1121                reader.senders_by_tx_range(0..6).unwrap(),
1122                senders.iter().copied().collect::<HashMap<_, _>>(),
1123                "{reader}"
1124            );
1125        }
1126    }
1127}
1128
1129#[cfg(test)]
1130mod rocksdb_tests {
1131    use super::*;
1132    use crate::{
1133        providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
1134        test_utils::create_test_provider_factory,
1135        RocksDBProviderFactory,
1136    };
1137    use alloy_primitives::{Address, B256};
1138    use reth_db_api::{
1139        models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1140        tables,
1141        transaction::DbTxMut,
1142    };
1143    use reth_ethereum_primitives::EthPrimitives;
1144    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1145    use std::marker::PhantomData;
1146    use tempfile::TempDir;
1147
1148    fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
1149        let temp_dir = TempDir::new().unwrap();
1150        let provider = RocksDBBuilder::new(temp_dir.path())
1151            .with_table::<tables::TransactionHashNumbers>()
1152            .with_table::<tables::StoragesHistory>()
1153            .with_table::<tables::AccountsHistory>()
1154            .build()
1155            .unwrap();
1156        (temp_dir, provider)
1157    }
1158
1159    /// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
1160    /// when the storage setting is enabled, and that put operations followed by commit
1161    /// persist the data to `RocksDB`.
1162    #[test]
1163    fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
1164        let factory = create_test_provider_factory();
1165
1166        // Enable RocksDB for transaction hash numbers
1167        factory.set_storage_settings_cache(StorageSettings::v2());
1168
1169        let hash1 = B256::from([1u8; 32]);
1170        let hash2 = B256::from([2u8; 32]);
1171        let tx_num1 = 100u64;
1172        let tx_num2 = 200u64;
1173
1174        // Get the RocksDB batch from the provider
1175        let rocksdb = factory.rocksdb_provider();
1176        let batch = rocksdb.batch();
1177
1178        // Create EitherWriter with RocksDB
1179        let provider = factory.database_provider_rw().unwrap();
1180        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1181
1182        // Verify we got a RocksDB writer
1183        assert!(matches!(writer, EitherWriter::RocksDB(_)));
1184
1185        // Write transaction hash numbers (append_only=false since we're using RocksDB)
1186        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1187        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1188
1189        // Extract the batch and register with provider for commit
1190        if let Some(batch) = writer.into_raw_rocksdb_batch() {
1191            provider.set_pending_rocksdb_batch(batch);
1192        }
1193
1194        // Commit via provider - this commits RocksDB batch too
1195        provider.commit().unwrap();
1196
1197        // Verify data was written to RocksDB
1198        let rocksdb = factory.rocksdb_provider();
1199        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1200        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1201    }
1202
1203    /// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
1204    #[test]
1205    fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
1206        let factory = create_test_provider_factory();
1207
1208        // Enable RocksDB for transaction hash numbers
1209        factory.set_storage_settings_cache(StorageSettings::v2());
1210
1211        let hash = B256::from([1u8; 32]);
1212        let tx_num = 100u64;
1213
1214        // First, write a value directly to RocksDB
1215        let rocksdb = factory.rocksdb_provider();
1216        rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1217        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1218
1219        // Now delete using EitherWriter
1220        let batch = rocksdb.batch();
1221        let provider = factory.database_provider_rw().unwrap();
1222        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1223        writer.delete_transaction_hash_number(hash).unwrap();
1224
1225        // Extract the batch and commit via provider
1226        if let Some(batch) = writer.into_raw_rocksdb_batch() {
1227            provider.set_pending_rocksdb_batch(batch);
1228        }
1229        provider.commit().unwrap();
1230
1231        // Verify deletion
1232        let rocksdb = factory.rocksdb_provider();
1233        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1234    }
1235
1236    #[test]
1237    fn test_rocksdb_batch_transaction_hash_numbers() {
1238        let (_temp_dir, provider) = create_rocksdb_provider();
1239
1240        let hash1 = B256::from([1u8; 32]);
1241        let hash2 = B256::from([2u8; 32]);
1242        let tx_num1 = 100u64;
1243        let tx_num2 = 200u64;
1244
1245        // Write via RocksDBBatch (same as EitherWriter::RocksDB would use internally)
1246        let mut batch = provider.batch();
1247        batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
1248        batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
1249        batch.commit().unwrap();
1250
1251        // Read via RocksTx (same as EitherReader::RocksDB would use internally)
1252        let tx = provider.tx();
1253        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1254        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1255
1256        // Test missing key
1257        let missing_hash = B256::from([99u8; 32]);
1258        assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
1259    }
1260
1261    #[test]
1262    fn test_rocksdb_batch_storage_history() {
1263        let (_temp_dir, provider) = create_rocksdb_provider();
1264
1265        let address = Address::random();
1266        let storage_key = B256::from([1u8; 32]);
1267        let key = StorageShardedKey::new(address, storage_key, 1000);
1268        let value = IntegerList::new([1, 5, 10, 50]).unwrap();
1269
1270        // Write via RocksDBBatch
1271        let mut batch = provider.batch();
1272        batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1273        batch.commit().unwrap();
1274
1275        // Read via RocksTx
1276        let tx = provider.tx();
1277        let result = tx.get::<tables::StoragesHistory>(key).unwrap();
1278        assert_eq!(result, Some(value));
1279
1280        // Test missing key
1281        let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
1282        assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
1283    }
1284
1285    #[test]
1286    fn test_rocksdb_batch_account_history() {
1287        let (_temp_dir, provider) = create_rocksdb_provider();
1288
1289        let address = Address::random();
1290        let key = ShardedKey::new(address, 1000);
1291        let value = IntegerList::new([1, 10, 100, 500]).unwrap();
1292
1293        // Write via RocksDBBatch
1294        let mut batch = provider.batch();
1295        batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1296        batch.commit().unwrap();
1297
1298        // Read via RocksTx
1299        let tx = provider.tx();
1300        let result = tx.get::<tables::AccountsHistory>(key).unwrap();
1301        assert_eq!(result, Some(value));
1302
1303        // Test missing key
1304        let missing_key = ShardedKey::new(Address::random(), 0);
1305        assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
1306    }
1307
1308    #[test]
1309    fn test_rocksdb_batch_delete_transaction_hash_number() {
1310        let (_temp_dir, provider) = create_rocksdb_provider();
1311
1312        let hash = B256::from([1u8; 32]);
1313        let tx_num = 100u64;
1314
1315        // First write
1316        provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1317        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1318
1319        // Delete via RocksDBBatch
1320        let mut batch = provider.batch();
1321        batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
1322        batch.commit().unwrap();
1323
1324        // Verify deletion
1325        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1326    }
1327
1328    #[test]
1329    fn test_rocksdb_batch_delete_storage_history() {
1330        let (_temp_dir, provider) = create_rocksdb_provider();
1331
1332        let address = Address::random();
1333        let storage_key = B256::from([1u8; 32]);
1334        let key = StorageShardedKey::new(address, storage_key, 1000);
1335        let value = IntegerList::new([1, 5, 10]).unwrap();
1336
1337        // First write
1338        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1339        assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
1340
1341        // Delete via RocksDBBatch
1342        let mut batch = provider.batch();
1343        batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
1344        batch.commit().unwrap();
1345
1346        // Verify deletion
1347        assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
1348    }
1349
1350    #[test]
1351    fn test_rocksdb_batch_delete_account_history() {
1352        let (_temp_dir, provider) = create_rocksdb_provider();
1353
1354        let address = Address::random();
1355        let key = ShardedKey::new(address, 1000);
1356        let value = IntegerList::new([1, 10, 100]).unwrap();
1357
1358        // First write
1359        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1360        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
1361
1362        // Delete via RocksDBBatch
1363        let mut batch = provider.batch();
1364        batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
1365        batch.commit().unwrap();
1366
1367        // Verify deletion
1368        assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
1369    }
1370
1371    // ==================== Parametrized Backend Equivalence Tests ====================
1372    //
1373    // These tests verify that MDBX and RocksDB produce identical results for history lookups.
1374    // Each scenario sets up the same data in both backends and asserts identical HistoryInfo.
1375
1376    /// Query parameters for a history lookup test case.
1377    struct HistoryQuery {
1378        block_number: BlockNumber,
1379        lowest_available: Option<BlockNumber>,
1380        expected: HistoryInfo,
1381    }
1382
1383    // Type aliases for cursor types (needed for EitherWriter/EitherReader type inference)
1384    type AccountsHistoryWriteCursor =
1385        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
1386    type StoragesHistoryWriteCursor =
1387        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
1388    type AccountsHistoryReadCursor =
1389        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
1390    type StoragesHistoryReadCursor =
1391        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
1392
1393    /// Runs the same account history queries against both MDBX and `RocksDB` backends,
1394    /// asserting they produce identical results.
1395    fn run_account_history_scenario(
1396        scenario_name: &str,
1397        address: Address,
1398        shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
1399        queries: &[HistoryQuery],
1400    ) {
1401        // Setup MDBX and RocksDB with identical data using EitherWriter
1402        let factory = create_test_provider_factory();
1403        let mdbx_provider = factory.database_provider_rw().unwrap();
1404        let (temp_dir, rocks_provider) = create_rocksdb_provider();
1405
1406        // Create writers for both backends
1407        let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1408            EitherWriter::Database(
1409                mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
1410            );
1411        let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1412            EitherWriter::RocksDB(rocks_provider.batch());
1413
1414        // Write identical data to both backends in a single loop
1415        for (highest_block, blocks) in shards {
1416            let key = ShardedKey::new(address, *highest_block);
1417            let value = IntegerList::new(blocks.clone()).unwrap();
1418            mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
1419            rocks_writer.upsert_account_history(key, &value).unwrap();
1420        }
1421
1422        // Commit both backends
1423        drop(mdbx_writer);
1424        mdbx_provider.commit().unwrap();
1425        if let EitherWriter::RocksDB(batch) = rocks_writer {
1426            batch.commit().unwrap();
1427        }
1428
1429        // Run queries against both backends using EitherReader
1430        let mdbx_ro = factory.database_provider_ro().unwrap();
1431        let rocks_snapshot = rocks_provider.snapshot();
1432
1433        for (i, query) in queries.iter().enumerate() {
1434            // MDBX query via EitherReader
1435            let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1436                EitherReader::Database(
1437                    mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
1438                    PhantomData,
1439                );
1440            let mdbx_result = mdbx_reader
1441                .account_history_info(address, query.block_number, query.lowest_available)
1442                .unwrap();
1443
1444            // RocksDB query via EitherReader — reuse snapshot for consistent view
1445            let rocks_result = rocks_snapshot
1446                .account_history_info(address, query.block_number, query.lowest_available)
1447                .unwrap();
1448
1449            // Assert both backends produce identical results
1450            assert_eq!(
1451                mdbx_result,
1452                rocks_result,
1453                "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1454                 MDBX: {:?}, RocksDB: {:?}",
1455                scenario_name,
1456                i,
1457                query.block_number,
1458                query.lowest_available,
1459                mdbx_result,
1460                rocks_result
1461            );
1462
1463            // Also verify against expected result
1464            assert_eq!(
1465                mdbx_result,
1466                query.expected,
1467                "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1468                 Got: {:?}, Expected: {:?}",
1469                scenario_name,
1470                i,
1471                query.block_number,
1472                query.lowest_available,
1473                mdbx_result,
1474                query.expected
1475            );
1476        }
1477
1478        drop(temp_dir);
1479    }
1480
1481    /// Runs the same storage history queries against both MDBX and `RocksDB` backends,
1482    /// asserting they produce identical results.
1483    fn run_storage_history_scenario(
1484        scenario_name: &str,
1485        address: Address,
1486        storage_key: B256,
1487        shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
1488        queries: &[HistoryQuery],
1489    ) {
1490        // Setup MDBX and RocksDB with identical data using EitherWriter
1491        let factory = create_test_provider_factory();
1492        let mdbx_provider = factory.database_provider_rw().unwrap();
1493        let (temp_dir, rocks_provider) = create_rocksdb_provider();
1494
1495        // Create writers for both backends
1496        let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1497            EitherWriter::Database(
1498                mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
1499            );
1500        let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1501            EitherWriter::RocksDB(rocks_provider.batch());
1502
1503        // Write identical data to both backends in a single loop
1504        for (highest_block, blocks) in shards {
1505            let key = StorageShardedKey::new(address, storage_key, *highest_block);
1506            let value = IntegerList::new(blocks.clone()).unwrap();
1507            mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
1508            rocks_writer.put_storage_history(key, &value).unwrap();
1509        }
1510
1511        // Commit both backends
1512        drop(mdbx_writer);
1513        mdbx_provider.commit().unwrap();
1514        if let EitherWriter::RocksDB(batch) = rocks_writer {
1515            batch.commit().unwrap();
1516        }
1517
1518        // Run queries against both backends using EitherReader
1519        let mdbx_ro = factory.database_provider_ro().unwrap();
1520        let rocks_snapshot = rocks_provider.snapshot();
1521
1522        for (i, query) in queries.iter().enumerate() {
1523            // MDBX query via EitherReader
1524            let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1525                EitherReader::Database(
1526                    mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
1527                    PhantomData,
1528                );
1529            let mdbx_result = mdbx_reader
1530                .storage_history_info(
1531                    address,
1532                    storage_key,
1533                    query.block_number,
1534                    query.lowest_available,
1535                )
1536                .unwrap();
1537
1538            // RocksDB query via snapshot — reuse for consistent view
1539            let rocks_result = rocks_snapshot
1540                .storage_history_info(
1541                    address,
1542                    storage_key,
1543                    query.block_number,
1544                    query.lowest_available,
1545                )
1546                .unwrap();
1547
1548            // Assert both backends produce identical results
1549            assert_eq!(
1550                mdbx_result,
1551                rocks_result,
1552                "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1553                 MDBX: {:?}, RocksDB: {:?}",
1554                scenario_name,
1555                i,
1556                query.block_number,
1557                query.lowest_available,
1558                mdbx_result,
1559                rocks_result
1560            );
1561
1562            // Also verify against expected result
1563            assert_eq!(
1564                mdbx_result,
1565                query.expected,
1566                "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1567                 Got: {:?}, Expected: {:?}",
1568                scenario_name,
1569                i,
1570                query.block_number,
1571                query.lowest_available,
1572                mdbx_result,
1573                query.expected
1574            );
1575        }
1576
1577        drop(temp_dir);
1578    }
1579
1580    /// Tests account history lookups across both MDBX and `RocksDB` backends.
1581    ///
1582    /// Covers the following scenarios from PR2's `RocksDB`-only tests:
1583    /// 1. Single shard - basic lookups within one shard
1584    /// 2. Multiple shards - `prev()` shard detection and transitions
1585    /// 3. No history - query address with no entries
1586    /// 4. Pruning boundary - `lowest_available` boundary behavior (block at/after boundary)
1587    #[test]
1588    fn test_account_history_info_both_backends() {
1589        let address = Address::from([0x42; 20]);
1590
1591        // Scenario 1: Single shard with blocks [100, 200, 300]
1592        run_account_history_scenario(
1593            "single_shard",
1594            address,
1595            &[(u64::MAX, vec![100, 200, 300])],
1596            &[
1597                // Before first entry -> NotYetWritten
1598                HistoryQuery {
1599                    block_number: 50,
1600                    lowest_available: None,
1601                    expected: HistoryInfo::NotYetWritten,
1602                },
1603                // Between entries -> InChangeset(next_write)
1604                HistoryQuery {
1605                    block_number: 150,
1606                    lowest_available: None,
1607                    expected: HistoryInfo::InChangeset(200),
1608                },
1609                // Exact match on entry -> InChangeset(same_block)
1610                HistoryQuery {
1611                    block_number: 300,
1612                    lowest_available: None,
1613                    expected: HistoryInfo::InChangeset(300),
1614                },
1615                // After last entry in last shard -> InPlainState
1616                HistoryQuery {
1617                    block_number: 500,
1618                    lowest_available: None,
1619                    expected: HistoryInfo::InPlainState,
1620                },
1621            ],
1622        );
1623
1624        // Scenario 2: Multiple shards - tests prev() shard detection
1625        run_account_history_scenario(
1626            "multiple_shards",
1627            address,
1628            &[
1629                (500, vec![100, 200, 300, 400, 500]), // First shard ends at 500
1630                (u64::MAX, vec![600, 700, 800]),      // Last shard
1631            ],
1632            &[
1633                // Before first shard, no prev -> NotYetWritten
1634                HistoryQuery {
1635                    block_number: 50,
1636                    lowest_available: None,
1637                    expected: HistoryInfo::NotYetWritten,
1638                },
1639                // Within first shard
1640                HistoryQuery {
1641                    block_number: 150,
1642                    lowest_available: None,
1643                    expected: HistoryInfo::InChangeset(200),
1644                },
1645                // Between shards - prev() should find first shard
1646                HistoryQuery {
1647                    block_number: 550,
1648                    lowest_available: None,
1649                    expected: HistoryInfo::InChangeset(600),
1650                },
1651                // After all entries
1652                HistoryQuery {
1653                    block_number: 900,
1654                    lowest_available: None,
1655                    expected: HistoryInfo::InPlainState,
1656                },
1657            ],
1658        );
1659
1660        // Scenario 3: No history for address
1661        let address_without_history = Address::from([0x43; 20]);
1662        run_account_history_scenario(
1663            "no_history",
1664            address_without_history,
1665            &[], // No shards for this address
1666            &[HistoryQuery {
1667                block_number: 150,
1668                lowest_available: None,
1669                expected: HistoryInfo::NotYetWritten,
1670            }],
1671        );
1672
1673        // Scenario 4: Query at pruning boundary
1674        // Note: We test block >= lowest_available because HistoricalStateProviderRef
1675        // errors on blocks below the pruning boundary before doing the lookup.
1676        // The RocksDB implementation doesn't have this check at the same level.
1677        // This tests that when pruning IS available, both backends agree.
1678        run_account_history_scenario(
1679            "with_pruning_boundary",
1680            address,
1681            &[(u64::MAX, vec![100, 200, 300])],
1682            &[
1683                // At pruning boundary -> InChangeset(first entry after block)
1684                HistoryQuery {
1685                    block_number: 100,
1686                    lowest_available: Some(100),
1687                    expected: HistoryInfo::InChangeset(100),
1688                },
1689                // After pruning boundary, between entries
1690                HistoryQuery {
1691                    block_number: 150,
1692                    lowest_available: Some(100),
1693                    expected: HistoryInfo::InChangeset(200),
1694                },
1695            ],
1696        );
1697    }
1698
1699    /// Tests storage history lookups across both MDBX and `RocksDB` backends.
1700    #[test]
1701    fn test_storage_history_info_both_backends() {
1702        let address = Address::from([0x42; 20]);
1703        let storage_key = B256::from([0x01; 32]);
1704        let other_storage_key = B256::from([0x02; 32]);
1705
1706        // Single shard with blocks [100, 200, 300]
1707        run_storage_history_scenario(
1708            "storage_single_shard",
1709            address,
1710            storage_key,
1711            &[(u64::MAX, vec![100, 200, 300])],
1712            &[
1713                // Before first entry -> NotYetWritten
1714                HistoryQuery {
1715                    block_number: 50,
1716                    lowest_available: None,
1717                    expected: HistoryInfo::NotYetWritten,
1718                },
1719                // Between entries -> InChangeset(next_write)
1720                HistoryQuery {
1721                    block_number: 150,
1722                    lowest_available: None,
1723                    expected: HistoryInfo::InChangeset(200),
1724                },
1725                // After last entry -> InPlainState
1726                HistoryQuery {
1727                    block_number: 500,
1728                    lowest_available: None,
1729                    expected: HistoryInfo::InPlainState,
1730                },
1731            ],
1732        );
1733
1734        // No history for different storage key
1735        run_storage_history_scenario(
1736            "storage_no_history",
1737            address,
1738            other_storage_key,
1739            &[], // No shards for this storage key
1740            &[HistoryQuery {
1741                block_number: 150,
1742                lowest_available: None,
1743                expected: HistoryInfo::NotYetWritten,
1744            }],
1745        );
1746    }
1747
1748    /// Test that `RocksDB` batches created via `EitherWriter` are only made visible when
1749    /// `provider.commit()` is called, not when the writer is dropped.
1750    #[test]
1751    fn test_rocksdb_commits_at_provider_level() {
1752        let factory = create_test_provider_factory();
1753
1754        // Enable RocksDB for transaction hash numbers
1755        factory.set_storage_settings_cache(StorageSettings::v2());
1756
1757        let hash1 = B256::from([1u8; 32]);
1758        let hash2 = B256::from([2u8; 32]);
1759        let tx_num1 = 100u64;
1760        let tx_num2 = 200u64;
1761
1762        // Get the RocksDB batch from the provider
1763        let rocksdb = factory.rocksdb_provider();
1764        let batch = rocksdb.batch();
1765
1766        // Create provider and EitherWriter
1767        let provider = factory.database_provider_rw().unwrap();
1768        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1769
1770        // Write transaction hash numbers (append_only=false since we're using RocksDB)
1771        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1772        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1773
1774        // Extract the raw batch from the writer and register it with the provider
1775        let raw_batch = writer.into_raw_rocksdb_batch();
1776        if let Some(batch) = raw_batch {
1777            provider.set_pending_rocksdb_batch(batch);
1778        }
1779
1780        // Data should NOT be visible yet (batch not committed)
1781        let rocksdb = factory.rocksdb_provider();
1782        assert_eq!(
1783            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1784            None,
1785            "Data should not be visible before provider.commit()"
1786        );
1787
1788        // Commit the provider - this should commit both MDBX and RocksDB
1789        provider.commit().unwrap();
1790
1791        // Now data should be visible in RocksDB
1792        let rocksdb = factory.rocksdb_provider();
1793        assert_eq!(
1794            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1795            Some(tx_num1),
1796            "Data should be visible after provider.commit()"
1797        );
1798        assert_eq!(
1799            rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
1800            Some(tx_num2),
1801            "Data should be visible after provider.commit()"
1802        );
1803    }
1804
1805    /// Test that `EitherReader::new_accounts_history` panics when settings require
1806    /// `RocksDB` but no snapshot is given (`None`). This is an invariant violation that
1807    /// indicates a bug - `with_rocksdb_snapshot` should always provide a snapshot when needed.
1808    #[test]
1809    #[should_panic(expected = "account_history_in_rocksdb requires rocksdb snapshot")]
1810    fn test_settings_mismatch_panics() {
1811        let factory = create_test_provider_factory();
1812
1813        factory.set_storage_settings_cache(StorageSettings::v2());
1814
1815        let provider = factory.database_provider_ro().unwrap();
1816        let _ = EitherReader::<(), ()>::new_accounts_history(&provider, None);
1817    }
1818}