Skip to main content

reth_provider/
either_writer.rs

1//! Generic reader and writer abstractions for interacting with either database tables or static
2//! files.
3
4use std::{
5    collections::BTreeSet,
6    marker::PhantomData,
7    ops::{Range, RangeInclusive},
8};
9
10use crate::{
11    providers::{
12        history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
13        StaticFileProviderRWRefMut,
14    },
15    StaticFileProviderFactory,
16};
17use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
18use rayon::slice::ParallelSliceMut;
19use reth_db::{
20    cursor::{DbCursorRO, DbDupCursorRW},
21    models::{AccountBeforeTx, StorageBeforeTx},
22    static_file::TransactionSenderMask,
23    table::Value,
24    transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut, DupCursorMutTy, DupCursorTy},
25};
26use reth_db_api::{
27    cursor::DbCursorRW,
28    models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
29    tables,
30    tables::BlockNumberList,
31};
32use reth_errors::ProviderError;
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{ReceiptTy, StorageEntry};
35use reth_static_file_types::StaticFileSegment;
36use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, StorageSettingsCache};
37use reth_storage_errors::provider::ProviderResult;
38use strum::{Display, EnumIs};
39
40/// Type alias for [`EitherReader`] constructors.
41type EitherReaderTy<'a, P, T> =
42    EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
43
44/// Type alias for [`EitherReader`] constructors.
45type DupEitherReaderTy<'a, P, T> = EitherReader<
46    'a,
47    DupCursorTy<<P as DBProvider>::Tx, T>,
48    <P as NodePrimitivesProvider>::Primitives,
49>;
50
51/// Type alias for dup [`EitherWriter`] constructors.
52type DupEitherWriterTy<'a, P, T> = EitherWriter<
53    'a,
54    DupCursorMutTy<<P as DBProvider>::Tx, T>,
55    <P as NodePrimitivesProvider>::Primitives,
56>;
57
58/// Type alias for [`EitherWriter`] constructors.
59type EitherWriterTy<'a, P, T> = EitherWriter<
60    'a,
61    CursorMutTy<<P as DBProvider>::Tx, T>,
62    <P as NodePrimitivesProvider>::Primitives,
63>;
64
65/// Helper type for `RocksDB` batch argument in writer constructors.
66pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
67
68/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
69pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
70
71/// Helper type for `RocksDB` snapshot argument in reader constructors.
72///
73/// The `Option` allows callers to skip `RocksDB` access when it isn't needed
74/// (e.g., on legacy MDBX-only nodes).
75pub type RocksDBRefArg<'a> = Option<crate::providers::rocksdb::RocksReadSnapshot<'a>>;
76
77/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
78#[derive(Debug, Display)]
79pub enum EitherWriter<'a, CURSOR, N> {
80    /// Write to database table via cursor
81    Database(CURSOR),
82    /// Write to static file
83    StaticFile(StaticFileProviderRWRefMut<'a, N>),
84    /// Write to `RocksDB` using a write-only batch (historical tables).
85    RocksDB(RocksDBBatch<'a>),
86}
87
88impl<'a> EitherWriter<'a, (), ()> {
89    /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes.
90    pub fn new_receipts<P>(
91        provider: &'a P,
92        block_number: BlockNumber,
93    ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
94    where
95        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
96        P::Tx: DbTxMut,
97        ReceiptTy<P::Primitives>: Value,
98    {
99        if Self::receipts_destination(provider).is_static_file() {
100            Ok(EitherWriter::StaticFile(
101                provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
102            ))
103        } else {
104            Ok(EitherWriter::Database(
105                provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
106            ))
107        }
108    }
109
110    /// Creates a new [`EitherWriter`] for senders based on storage settings.
111    pub fn new_senders<P>(
112        provider: &'a P,
113        block_number: BlockNumber,
114    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
115    where
116        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
117        P::Tx: DbTxMut,
118    {
119        if EitherWriterDestination::senders(provider).is_static_file() {
120            Ok(EitherWriter::StaticFile(
121                provider
122                    .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
123            ))
124        } else {
125            Ok(EitherWriter::Database(
126                provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
127            ))
128        }
129    }
130
131    /// Creates a new [`EitherWriter`] for account changesets based on storage settings and prune
132    /// modes.
133    pub fn new_account_changesets<P>(
134        provider: &'a P,
135        block_number: BlockNumber,
136    ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::AccountChangeSets>>
137    where
138        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
139        P::Tx: DbTxMut,
140    {
141        if provider.cached_storage_settings().storage_v2 {
142            Ok(EitherWriter::StaticFile(
143                provider
144                    .get_static_file_writer(block_number, StaticFileSegment::AccountChangeSets)?,
145            ))
146        } else {
147            Ok(EitherWriter::Database(
148                provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?,
149            ))
150        }
151    }
152
153    /// Creates a new [`EitherWriter`] for storage changesets based on storage settings.
154    pub fn new_storage_changesets<P>(
155        provider: &'a P,
156        block_number: BlockNumber,
157    ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::StorageChangeSets>>
158    where
159        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
160        P::Tx: DbTxMut,
161    {
162        if provider.cached_storage_settings().storage_v2 {
163            Ok(EitherWriter::StaticFile(
164                provider
165                    .get_static_file_writer(block_number, StaticFileSegment::StorageChangeSets)?,
166            ))
167        } else {
168            Ok(EitherWriter::Database(
169                provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?,
170            ))
171        }
172    }
173
174    /// Returns the destination for writing receipts.
175    ///
176    /// The rules are as follows:
177    /// - If the node should not always write receipts to static files, and any receipt pruning is
178    ///   enabled, write to the database.
179    /// - If the node should always write receipts to static files, but receipt log filter pruning
180    ///   is enabled, write to the database.
181    /// - Otherwise, write to static files.
182    pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
183        provider: &P,
184    ) -> EitherWriterDestination {
185        let receipts_in_static_files = provider.cached_storage_settings().storage_v2;
186        let prune_modes = provider.prune_modes_ref();
187
188        if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
189            // TODO: support writing receipts to static files with log filter pruning enabled
190            receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
191        {
192            EitherWriterDestination::Database
193        } else {
194            EitherWriterDestination::StaticFile
195        }
196    }
197
198    /// Returns the destination for writing account changesets.
199    ///
200    /// This determines the destination based solely on storage settings.
201    pub fn account_changesets_destination<P: DBProvider + StorageSettingsCache>(
202        provider: &P,
203    ) -> EitherWriterDestination {
204        if provider.cached_storage_settings().storage_v2 {
205            EitherWriterDestination::StaticFile
206        } else {
207            EitherWriterDestination::Database
208        }
209    }
210
211    /// Returns the destination for writing storage changesets.
212    ///
213    /// This determines the destination based solely on storage settings.
214    pub fn storage_changesets_destination<P: DBProvider + StorageSettingsCache>(
215        provider: &P,
216    ) -> EitherWriterDestination {
217        if provider.cached_storage_settings().storage_v2 {
218            EitherWriterDestination::StaticFile
219        } else {
220            EitherWriterDestination::Database
221        }
222    }
223
224    /// Creates a new [`EitherWriter`] for storages history based on storage settings.
225    pub fn new_storages_history<P>(
226        provider: &P,
227        _rocksdb_batch: RocksBatchArg<'a>,
228    ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
229    where
230        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
231        P::Tx: DbTxMut,
232    {
233        if provider.cached_storage_settings().storage_v2 {
234            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
235        }
236
237        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
238    }
239
240    /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
241    pub fn new_transaction_hash_numbers<P>(
242        provider: &P,
243        _rocksdb_batch: RocksBatchArg<'a>,
244    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
245    where
246        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
247        P::Tx: DbTxMut,
248    {
249        if provider.cached_storage_settings().storage_v2 {
250            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
251        }
252
253        Ok(EitherWriter::Database(
254            provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
255        ))
256    }
257
258    /// Creates a new [`EitherWriter`] for account history based on storage settings.
259    pub fn new_accounts_history<P>(
260        provider: &P,
261        _rocksdb_batch: RocksBatchArg<'a>,
262    ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
263    where
264        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
265        P::Tx: DbTxMut,
266    {
267        if provider.cached_storage_settings().storage_v2 {
268            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
269        }
270
271        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
272    }
273}
274
275impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
276    /// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
277    ///
278    /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
279    /// `None` for other variants.
280    ///
281    /// This is used to defer `RocksDB` commits to the provider level, ensuring all
282    /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
283    pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
284        match self {
285            Self::Database(_) | Self::StaticFile(_) => None,
286            Self::RocksDB(batch) => Some(batch.into_inner()),
287        }
288    }
289
290    /// Increment the block number.
291    ///
292    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
293    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
294        match self {
295            Self::Database(_) => Ok(()),
296            Self::StaticFile(writer) => writer.increment_block(expected_block_number),
297            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
298        }
299    }
300
301    /// Ensures that the writer is positioned at the specified block number.
302    ///
303    /// If the writer is positioned at a greater block number than the specified one, the writer
304    /// will NOT be unwound and the error will be returned.
305    ///
306    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
307    pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
308        match self {
309            Self::Database(_) => Ok(()),
310            Self::StaticFile(writer) => writer.ensure_at_block(block_number),
311            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
312        }
313    }
314}
315
316impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
317where
318    N::Receipt: Value,
319    CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
320{
321    /// Append a transaction receipt.
322    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
323        match self {
324            Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
325            Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
326            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
327        }
328    }
329}
330
331impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
332where
333    CURSOR: DbCursorRW<tables::TransactionSenders>,
334{
335    /// Append a transaction sender to the destination
336    pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
337        match self {
338            Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
339            Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
340            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
341        }
342    }
343
344    /// Append transaction senders to the destination
345    pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
346    where
347        I: Iterator<Item = (TxNumber, Address)>,
348    {
349        match self {
350            Self::Database(cursor) => {
351                for (tx_num, sender) in senders {
352                    cursor.append(tx_num, &sender)?;
353                }
354                Ok(())
355            }
356            Self::StaticFile(writer) => writer.append_transaction_senders(senders),
357            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
358        }
359    }
360
361    /// Removes all transaction senders above the given transaction number, and stops at the given
362    /// block number.
363    pub fn prune_senders(
364        &mut self,
365        unwind_tx_from: TxNumber,
366        block: BlockNumber,
367    ) -> ProviderResult<()>
368    where
369        CURSOR: DbCursorRO<tables::TransactionSenders>,
370    {
371        match self {
372            Self::Database(cursor) => {
373                let mut walker = cursor.walk_range(unwind_tx_from..)?;
374                while walker.next().transpose()?.is_some() {
375                    walker.delete_current()?;
376                }
377            }
378            Self::StaticFile(writer) => {
379                let static_file_transaction_sender_num = writer
380                    .reader()
381                    .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
382
383                let to_delete = static_file_transaction_sender_num
384                    .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
385                    .unwrap_or_default();
386
387                writer.prune_transaction_senders(to_delete, block)?;
388            }
389            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
390        }
391
392        Ok(())
393    }
394}
395
396impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
397where
398    CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
399{
400    /// Puts a transaction hash number mapping.
401    ///
402    /// When `append_only` is true, uses `cursor.append()` which is significantly faster
403    /// but requires entries to be inserted in order and the table to be empty.
404    /// When false, uses `cursor.upsert()` which handles arbitrary insertion order and duplicates.
405    pub fn put_transaction_hash_number(
406        &mut self,
407        hash: TxHash,
408        tx_num: TxNumber,
409        append_only: bool,
410    ) -> ProviderResult<()> {
411        match self {
412            Self::Database(cursor) => {
413                if append_only {
414                    Ok(cursor.append(hash, &tx_num)?)
415                } else {
416                    Ok(cursor.upsert(hash, &tx_num)?)
417                }
418            }
419            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
420            Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
421        }
422    }
423
424    /// Puts multiple transaction hash number mappings in a batch.
425    ///
426    /// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor.
427    /// This is more efficient than calling `put_transaction_hash_number` repeatedly.
428    ///
429    /// When `append_only` is true, uses `cursor.append()` which requires entries to be
430    /// pre-sorted and the table to be empty or have only lower keys.
431    /// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
432    pub fn put_transaction_hash_numbers_batch(
433        &mut self,
434        entries: Vec<(TxHash, TxNumber)>,
435        append_only: bool,
436    ) -> ProviderResult<()> {
437        match self {
438            Self::Database(cursor) => {
439                for (hash, tx_num) in entries {
440                    if append_only {
441                        cursor.append(hash, &tx_num)?;
442                    } else {
443                        cursor.upsert(hash, &tx_num)?;
444                    }
445                }
446                Ok(())
447            }
448            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
449            Self::RocksDB(batch) => {
450                for (hash, tx_num) in entries {
451                    batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
452                }
453                Ok(())
454            }
455        }
456    }
457
458    /// Deletes a transaction hash number mapping.
459    pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
460        match self {
461            Self::Database(cursor) => {
462                if cursor.seek_exact(hash)?.is_some() {
463                    cursor.delete_current()?;
464                }
465                Ok(())
466            }
467            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
468            Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
469        }
470    }
471}
472
473impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
474where
475    CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
476{
477    /// Puts a storage history entry.
478    pub fn put_storage_history(
479        &mut self,
480        key: StorageShardedKey,
481        value: &BlockNumberList,
482    ) -> ProviderResult<()> {
483        match self {
484            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
485            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
486            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
487        }
488    }
489
490    /// Deletes a storage history entry.
491    pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
492        match self {
493            Self::Database(cursor) => {
494                if cursor.seek_exact(key)?.is_some() {
495                    cursor.delete_current()?;
496                }
497                Ok(())
498            }
499            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
500            Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
501        }
502    }
503
504    /// Appends a storage history entry (for first sync - more efficient).
505    pub fn append_storage_history(
506        &mut self,
507        key: StorageShardedKey,
508        value: &BlockNumberList,
509    ) -> ProviderResult<()> {
510        match self {
511            Self::Database(cursor) => Ok(cursor.append(key, value)?),
512            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
513            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
514        }
515    }
516
517    /// Upserts a storage history entry (for incremental sync).
518    pub fn upsert_storage_history(
519        &mut self,
520        key: StorageShardedKey,
521        value: &BlockNumberList,
522    ) -> ProviderResult<()> {
523        match self {
524            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
525            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
526            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
527        }
528    }
529
530    /// Gets the last shard for an address and storage key (keyed with `u64::MAX`).
531    pub fn get_last_storage_history_shard(
532        &mut self,
533        address: Address,
534        storage_key: B256,
535    ) -> ProviderResult<Option<BlockNumberList>> {
536        let key = StorageShardedKey::last(address, storage_key);
537        match self {
538            Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
539            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
540            Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
541        }
542    }
543}
544
545impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
546where
547    CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
548{
549    /// Appends an account history entry (for first sync - more efficient).
550    pub fn append_account_history(
551        &mut self,
552        key: ShardedKey<Address>,
553        value: &BlockNumberList,
554    ) -> ProviderResult<()> {
555        match self {
556            Self::Database(cursor) => Ok(cursor.append(key, value)?),
557            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
558            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
559        }
560    }
561
562    /// Upserts an account history entry (for incremental sync).
563    pub fn upsert_account_history(
564        &mut self,
565        key: ShardedKey<Address>,
566        value: &BlockNumberList,
567    ) -> ProviderResult<()> {
568        match self {
569            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
570            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
571            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
572        }
573    }
574
575    /// Gets the last shard for an address (keyed with `u64::MAX`).
576    pub fn get_last_account_history_shard(
577        &mut self,
578        address: Address,
579    ) -> ProviderResult<Option<BlockNumberList>> {
580        match self {
581            Self::Database(cursor) => {
582                Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
583            }
584            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
585            Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
586        }
587    }
588
589    /// Deletes an account history entry.
590    pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
591        match self {
592            Self::Database(cursor) => {
593                if cursor.seek_exact(key)?.is_some() {
594                    cursor.delete_current()?;
595                }
596                Ok(())
597            }
598            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
599            Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
600        }
601    }
602}
603
604impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
605where
606    CURSOR: DbDupCursorRW<tables::AccountChangeSets>,
607{
608    /// Append account changeset for a block.
609    ///
610    /// NOTE: This _sorts_ the changesets by address before appending
611    pub fn append_account_changeset(
612        &mut self,
613        block_number: BlockNumber,
614        mut changeset: Vec<AccountBeforeTx>,
615    ) -> ProviderResult<()> {
616        // First sort the changesets
617        changeset.par_sort_by_key(|a| a.address);
618        match self {
619            Self::Database(cursor) => {
620                for change in changeset {
621                    cursor.append_dup(block_number, change)?;
622                }
623            }
624            Self::StaticFile(writer) => {
625                writer.append_account_changeset(changeset, block_number)?;
626            }
627            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
628        }
629
630        Ok(())
631    }
632}
633
634impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
635where
636    CURSOR: DbDupCursorRW<tables::StorageChangeSets>,
637{
638    /// Append storage changeset for a block.
639    ///
640    /// NOTE: This _sorts_ the changesets by address and storage key before appending.
641    pub fn append_storage_changeset(
642        &mut self,
643        block_number: BlockNumber,
644        mut changeset: Vec<StorageBeforeTx>,
645    ) -> ProviderResult<()> {
646        changeset.par_sort_by_key(|change| (change.address, change.key));
647
648        match self {
649            Self::Database(cursor) => {
650                for change in changeset {
651                    let storage_id = BlockNumberAddress((block_number, change.address));
652                    cursor.append_dup(
653                        storage_id,
654                        StorageEntry { key: change.key, value: change.value },
655                    )?;
656                }
657            }
658            Self::StaticFile(writer) => {
659                writer.append_storage_changeset(changeset, block_number)?;
660            }
661            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
662        }
663
664        Ok(())
665    }
666}
667
668/// Represents a source for reading data, either from database, static files, or `RocksDB`.
669#[derive(Debug, Display)]
670pub enum EitherReader<'a, CURSOR, N> {
671    /// Read from database table via cursor
672    Database(CURSOR, PhantomData<&'a ()>),
673    /// Read from static file
674    StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
675    /// Read from `RocksDB` snapshot (works in both read-only and read-write modes)
676    RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>),
677}
678
679impl<'a> EitherReader<'a, (), ()> {
680    /// Creates a new [`EitherReader`] for senders based on storage settings.
681    pub fn new_senders<P>(
682        provider: &P,
683    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
684    where
685        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
686        P::Tx: DbTx,
687    {
688        if EitherWriterDestination::senders(provider).is_static_file() {
689            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
690        } else {
691            Ok(EitherReader::Database(
692                provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
693                PhantomData,
694            ))
695        }
696    }
697
698    /// Creates a new [`EitherReader`] for storages history based on storage settings.
699    pub fn new_storages_history<P>(
700        provider: &P,
701        rocksdb: RocksDBRefArg<'a>,
702    ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
703    where
704        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
705        P::Tx: DbTx,
706    {
707        if provider.cached_storage_settings().storage_v2 {
708            return Ok(EitherReader::RocksDB(
709                rocksdb.expect("storages_history_in_rocksdb requires rocksdb snapshot"),
710            ));
711        }
712
713        Ok(EitherReader::Database(
714            provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
715            PhantomData,
716        ))
717    }
718
719    /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
720    pub fn new_transaction_hash_numbers<P>(
721        provider: &P,
722        rocksdb: RocksDBRefArg<'a>,
723    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
724    where
725        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
726        P::Tx: DbTx,
727    {
728        if provider.cached_storage_settings().storage_v2 {
729            return Ok(EitherReader::RocksDB(
730                rocksdb.expect("transaction_hash_numbers_in_rocksdb requires rocksdb snapshot"),
731            ));
732        }
733
734        Ok(EitherReader::Database(
735            provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
736            PhantomData,
737        ))
738    }
739
740    /// Creates a new [`EitherReader`] for account history based on storage settings.
741    pub fn new_accounts_history<P>(
742        provider: &P,
743        rocksdb: RocksDBRefArg<'a>,
744    ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
745    where
746        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
747        P::Tx: DbTx,
748    {
749        if provider.cached_storage_settings().storage_v2 {
750            return Ok(EitherReader::RocksDB(
751                rocksdb.expect("account_history_in_rocksdb requires rocksdb snapshot"),
752            ));
753        }
754
755        Ok(EitherReader::Database(
756            provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
757            PhantomData,
758        ))
759    }
760
761    /// Creates a new [`EitherReader`] for account changesets based on storage settings.
762    pub fn new_account_changesets<P>(
763        provider: &P,
764    ) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
765    where
766        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
767        P::Tx: DbTx,
768    {
769        if EitherWriterDestination::account_changesets(provider).is_static_file() {
770            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
771        } else {
772            Ok(EitherReader::Database(
773                provider.tx_ref().cursor_dup_read::<tables::AccountChangeSets>()?,
774                PhantomData,
775            ))
776        }
777    }
778}
779
780impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
781where
782    CURSOR: DbCursorRO<tables::TransactionSenders>,
783{
784    /// Fetches the senders for a range of transactions.
785    pub fn senders_by_tx_range(
786        &mut self,
787        range: Range<TxNumber>,
788    ) -> ProviderResult<HashMap<TxNumber, Address>> {
789        match self {
790            Self::Database(cursor, _) => cursor
791                .walk_range(range)?
792                .map(|result| result.map_err(ProviderError::from))
793                .collect::<ProviderResult<HashMap<_, _>>>(),
794            Self::StaticFile(provider, _) => range
795                .clone()
796                .zip(provider.fetch_range_iter(
797                    StaticFileSegment::TransactionSenders,
798                    range,
799                    |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
800                )?)
801                .filter_map(|(tx_num, sender)| {
802                    let result = sender.transpose()?;
803                    Some(result.map(|sender| (tx_num, sender)))
804                })
805                .collect::<ProviderResult<HashMap<_, _>>>(),
806            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
807        }
808    }
809}
810
811impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
812where
813    CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
814{
815    /// Gets a transaction number by its hash.
816    pub fn get_transaction_hash_number(
817        &mut self,
818        hash: TxHash,
819    ) -> ProviderResult<Option<TxNumber>> {
820        match self {
821            Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
822            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
823            Self::RocksDB(snapshot) => snapshot.get::<tables::TransactionHashNumbers>(hash),
824        }
825    }
826}
827
828impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
829where
830    CURSOR: DbCursorRO<tables::StoragesHistory>,
831{
832    /// Gets a storage history shard entry for the given [`StorageShardedKey`], if present.
833    pub fn get_storage_history(
834        &mut self,
835        key: StorageShardedKey,
836    ) -> ProviderResult<Option<BlockNumberList>> {
837        match self {
838            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
839            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
840            Self::RocksDB(snapshot) => snapshot.get::<tables::StoragesHistory>(key),
841        }
842    }
843
844    /// Lookup storage history and return [`HistoryInfo`].
845    pub fn storage_history_info(
846        &mut self,
847        address: Address,
848        storage_key: alloy_primitives::B256,
849        block_number: BlockNumber,
850        lowest_available_block_number: Option<BlockNumber>,
851        visible_tip: BlockNumber,
852    ) -> ProviderResult<HistoryInfo> {
853        match self {
854            Self::Database(cursor, _) => {
855                let key = StorageShardedKey::new(address, storage_key, block_number);
856                history_info::<tables::StoragesHistory, _, _>(
857                    cursor,
858                    key,
859                    block_number,
860                    |k| k.address == address && k.sharded_key.key == storage_key,
861                    lowest_available_block_number,
862                )
863            }
864            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
865            Self::RocksDB(snapshot) => snapshot.storage_history_info(
866                address,
867                storage_key,
868                block_number,
869                lowest_available_block_number,
870                visible_tip,
871            ),
872        }
873    }
874}
875
876impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
877where
878    CURSOR: DbCursorRO<tables::AccountsHistory>,
879{
880    /// Gets an account history shard entry for the given [`ShardedKey`], if present.
881    pub fn get_account_history(
882        &mut self,
883        key: ShardedKey<Address>,
884    ) -> ProviderResult<Option<BlockNumberList>> {
885        match self {
886            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
887            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
888            Self::RocksDB(snapshot) => snapshot.get::<tables::AccountsHistory>(key),
889        }
890    }
891
892    /// Lookup account history and return [`HistoryInfo`].
893    pub fn account_history_info(
894        &mut self,
895        address: Address,
896        block_number: BlockNumber,
897        lowest_available_block_number: Option<BlockNumber>,
898        visible_tip: BlockNumber,
899    ) -> ProviderResult<HistoryInfo> {
900        match self {
901            Self::Database(cursor, _) => {
902                let key = ShardedKey::new(address, block_number);
903                history_info::<tables::AccountsHistory, _, _>(
904                    cursor,
905                    key,
906                    block_number,
907                    |k| k.key == address,
908                    lowest_available_block_number,
909                )
910            }
911            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
912            Self::RocksDB(snapshot) => snapshot.account_history_info(
913                address,
914                block_number,
915                lowest_available_block_number,
916                visible_tip,
917            ),
918        }
919    }
920}
921
922impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
923where
924    CURSOR: DbCursorRO<tables::AccountChangeSets>,
925{
926    /// Iterate over account changesets and return all account address that were changed.
927    pub fn changed_accounts_with_range(
928        &mut self,
929        range: RangeInclusive<BlockNumber>,
930    ) -> ProviderResult<BTreeSet<Address>> {
931        match self {
932            Self::StaticFile(provider, _) => {
933                let highest_static_block =
934                    provider.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
935
936                let Some(highest) = highest_static_block else {
937                    return Err(ProviderError::MissingHighestStaticFileBlock(
938                        StaticFileSegment::AccountChangeSets,
939                    ))
940                };
941
942                let start = *range.start();
943                let static_end = (*range.end()).min(highest);
944
945                let mut changed_accounts = BTreeSet::default();
946                if start <= static_end {
947                    for block in start..=static_end {
948                        let block_changesets = provider.account_block_changeset(block)?;
949                        for changeset in block_changesets {
950                            changed_accounts.insert(changeset.address);
951                        }
952                    }
953                }
954
955                Ok(changed_accounts)
956            }
957            Self::Database(provider, _) => provider
958                .walk_range(range)?
959                .map(|entry| {
960                    entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
961                })
962                .collect(),
963            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
964        }
965    }
966}
967
968/// Destination for writing data.
969#[derive(Debug, EnumIs)]
970pub enum EitherWriterDestination {
971    /// Write to database table
972    Database,
973    /// Write to static file
974    StaticFile,
975    /// Write to `RocksDB`
976    RocksDB,
977}
978
979impl EitherWriterDestination {
980    /// Returns the destination for writing senders based on storage settings.
981    pub fn senders<P>(provider: &P) -> Self
982    where
983        P: StorageSettingsCache,
984    {
985        // Write senders to static files only if they're explicitly enabled
986        if provider.cached_storage_settings().storage_v2 {
987            Self::StaticFile
988        } else {
989            Self::Database
990        }
991    }
992
993    /// Returns the destination for writing account changesets based on storage settings.
994    pub fn account_changesets<P>(provider: &P) -> Self
995    where
996        P: StorageSettingsCache,
997    {
998        // Write account changesets to static files only if they're explicitly enabled
999        if provider.cached_storage_settings().storage_v2 {
1000            Self::StaticFile
1001        } else {
1002            Self::Database
1003        }
1004    }
1005
1006    /// Returns the destination for writing storage changesets based on storage settings.
1007    pub fn storage_changesets<P>(provider: &P) -> Self
1008    where
1009        P: StorageSettingsCache,
1010    {
1011        // Write storage changesets to static files only if they're explicitly enabled
1012        if provider.cached_storage_settings().storage_v2 {
1013            Self::StaticFile
1014        } else {
1015            Self::Database
1016        }
1017    }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use crate::{test_utils::create_test_provider_factory, StaticFileWriter};
1023
1024    use super::*;
1025    use alloy_primitives::Address;
1026    use reth_db::models::AccountBeforeTx;
1027    use reth_static_file_types::StaticFileSegment;
1028    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1029
1030    /// Verifies that `changed_accounts_with_range` correctly caps the query range to the
1031    /// static file tip when the requested range extends beyond it.
1032    ///
1033    /// This test documents the fix for an off-by-one bug where the code computed
1034    /// `static_end = range.end().min(highest + 1)` instead of `min(highest)`.
1035    /// The bug allowed iteration to attempt reading block `highest + 1` which doesn't
1036    /// exist (silently returning empty results due to `MissingStaticFileBlock` handling).
1037    ///
1038    /// While the bug was masked by error handling, it caused:
1039    /// 1. Unnecessary iteration/lookup for non-existent blocks
1040    /// 2. Potential overflow when `highest == u64::MAX`
1041    #[test]
1042    fn test_changed_accounts_with_range_caps_at_static_file_tip() {
1043        let factory = create_test_provider_factory();
1044        let highest_block = 5u64;
1045
1046        let addresses: Vec<Address> = (0..=highest_block)
1047            .map(|i| {
1048                let mut addr = Address::ZERO;
1049                addr.0[0] = i as u8;
1050                addr
1051            })
1052            .collect();
1053
1054        {
1055            let sf_provider = factory.static_file_provider();
1056            let mut writer =
1057                sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1058
1059            for block_num in 0..=highest_block {
1060                let changeset =
1061                    vec![AccountBeforeTx { address: addresses[block_num as usize], info: None }];
1062                writer.append_account_changeset(changeset, block_num).unwrap();
1063            }
1064            writer.commit().unwrap();
1065        }
1066
1067        factory.set_storage_settings_cache(StorageSettings::v2());
1068
1069        let provider = factory.database_provider_ro().unwrap();
1070
1071        let sf_tip = provider
1072            .static_file_provider()
1073            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1074        assert_eq!(sf_tip, Some(highest_block));
1075
1076        let mut reader = EitherReader::new_account_changesets(&provider).unwrap();
1077        assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1078
1079        // Query range 0..=10 when tip is 5 - should return only accounts from blocks 0-5
1080        let result = reader.changed_accounts_with_range(0..=10).unwrap();
1081
1082        let expected: BTreeSet<Address> = addresses.into_iter().collect();
1083        assert_eq!(result, expected);
1084    }
1085
1086    #[test]
1087    fn test_reader_senders_by_tx_range() {
1088        let factory = create_test_provider_factory();
1089
1090        // Insert senders only from 1 to 4, but we will query from 0 to 5.
1091        let senders = [
1092            (1, Address::random()),
1093            (2, Address::random()),
1094            (3, Address::random()),
1095            (4, Address::random()),
1096        ];
1097
1098        for transaction_senders_in_static_files in [false, true] {
1099            factory.set_storage_settings_cache(if transaction_senders_in_static_files {
1100                StorageSettings::v2()
1101            } else {
1102                StorageSettings::v1()
1103            });
1104
1105            let provider = factory.database_provider_rw().unwrap();
1106            let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
1107            if transaction_senders_in_static_files {
1108                assert!(matches!(writer, EitherWriter::StaticFile(_)));
1109            } else {
1110                assert!(matches!(writer, EitherWriter::Database(_)));
1111            }
1112
1113            writer.increment_block(0).unwrap();
1114            writer.append_senders(senders.iter().copied()).unwrap();
1115            drop(writer);
1116            provider.commit().unwrap();
1117
1118            let provider = factory.database_provider_ro().unwrap();
1119            let mut reader = EitherReader::new_senders(&provider).unwrap();
1120            if transaction_senders_in_static_files {
1121                assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1122            } else {
1123                assert!(matches!(reader, EitherReader::Database(_, _)));
1124            }
1125
1126            assert_eq!(
1127                reader.senders_by_tx_range(0..6).unwrap(),
1128                senders.iter().copied().collect::<HashMap<_, _>>(),
1129                "{reader}"
1130            );
1131        }
1132    }
1133}
1134
1135#[cfg(test)]
1136mod rocksdb_tests {
1137    use super::*;
1138    use crate::{
1139        providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
1140        test_utils::create_test_provider_factory,
1141        RocksDBProviderFactory,
1142    };
1143    use alloy_primitives::{Address, B256};
1144    use reth_db_api::{
1145        models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1146        tables,
1147        transaction::DbTxMut,
1148    };
1149    use reth_ethereum_primitives::EthPrimitives;
1150    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1151    use std::marker::PhantomData;
1152    use tempfile::TempDir;
1153
1154    fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
1155        let temp_dir = TempDir::new().unwrap();
1156        let provider = RocksDBBuilder::new(temp_dir.path())
1157            .with_table::<tables::TransactionHashNumbers>()
1158            .with_table::<tables::StoragesHistory>()
1159            .with_table::<tables::AccountsHistory>()
1160            .build()
1161            .unwrap();
1162        (temp_dir, provider)
1163    }
1164
1165    /// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
1166    /// when the storage setting is enabled, and that put operations followed by commit
1167    /// persist the data to `RocksDB`.
1168    #[test]
1169    fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
1170        let factory = create_test_provider_factory();
1171
1172        // Enable RocksDB for transaction hash numbers
1173        factory.set_storage_settings_cache(StorageSettings::v2());
1174
1175        let hash1 = B256::from([1u8; 32]);
1176        let hash2 = B256::from([2u8; 32]);
1177        let tx_num1 = 100u64;
1178        let tx_num2 = 200u64;
1179
1180        // Get the RocksDB batch from the provider
1181        let rocksdb = factory.rocksdb_provider();
1182        let batch = rocksdb.batch();
1183
1184        // Create EitherWriter with RocksDB
1185        let provider = factory.database_provider_rw().unwrap();
1186        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1187
1188        // Verify we got a RocksDB writer
1189        assert!(matches!(writer, EitherWriter::RocksDB(_)));
1190
1191        // Write transaction hash numbers (append_only=false since we're using RocksDB)
1192        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1193        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1194
1195        // Extract the batch and register with provider for commit
1196        if let Some(batch) = writer.into_raw_rocksdb_batch() {
1197            provider.set_pending_rocksdb_batch(batch);
1198        }
1199
1200        // Commit via provider - this commits RocksDB batch too
1201        provider.commit().unwrap();
1202
1203        // Verify data was written to RocksDB
1204        let rocksdb = factory.rocksdb_provider();
1205        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1206        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1207    }
1208
1209    /// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
1210    #[test]
1211    fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
1212        let factory = create_test_provider_factory();
1213
1214        // Enable RocksDB for transaction hash numbers
1215        factory.set_storage_settings_cache(StorageSettings::v2());
1216
1217        let hash = B256::from([1u8; 32]);
1218        let tx_num = 100u64;
1219
1220        // First, write a value directly to RocksDB
1221        let rocksdb = factory.rocksdb_provider();
1222        rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1223        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1224
1225        // Now delete using EitherWriter
1226        let batch = rocksdb.batch();
1227        let provider = factory.database_provider_rw().unwrap();
1228        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1229        writer.delete_transaction_hash_number(hash).unwrap();
1230
1231        // Extract the batch and commit via provider
1232        if let Some(batch) = writer.into_raw_rocksdb_batch() {
1233            provider.set_pending_rocksdb_batch(batch);
1234        }
1235        provider.commit().unwrap();
1236
1237        // Verify deletion
1238        let rocksdb = factory.rocksdb_provider();
1239        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1240    }
1241
1242    #[test]
1243    fn test_rocksdb_batch_transaction_hash_numbers() {
1244        let (_temp_dir, provider) = create_rocksdb_provider();
1245
1246        let hash1 = B256::from([1u8; 32]);
1247        let hash2 = B256::from([2u8; 32]);
1248        let tx_num1 = 100u64;
1249        let tx_num2 = 200u64;
1250
1251        // Write via RocksDBBatch (same as EitherWriter::RocksDB would use internally)
1252        let mut batch = provider.batch();
1253        batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
1254        batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
1255        batch.commit().unwrap();
1256
1257        // Read via RocksTx (same as EitherReader::RocksDB would use internally)
1258        let tx = provider.tx();
1259        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1260        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1261
1262        // Test missing key
1263        let missing_hash = B256::from([99u8; 32]);
1264        assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
1265    }
1266
1267    #[test]
1268    fn test_rocksdb_batch_storage_history() {
1269        let (_temp_dir, provider) = create_rocksdb_provider();
1270
1271        let address = Address::random();
1272        let storage_key = B256::from([1u8; 32]);
1273        let key = StorageShardedKey::new(address, storage_key, 1000);
1274        let value = IntegerList::new([1, 5, 10, 50]).unwrap();
1275
1276        // Write via RocksDBBatch
1277        let mut batch = provider.batch();
1278        batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1279        batch.commit().unwrap();
1280
1281        // Read via RocksTx
1282        let tx = provider.tx();
1283        let result = tx.get::<tables::StoragesHistory>(key).unwrap();
1284        assert_eq!(result, Some(value));
1285
1286        // Test missing key
1287        let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
1288        assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
1289    }
1290
1291    #[test]
1292    fn test_rocksdb_batch_account_history() {
1293        let (_temp_dir, provider) = create_rocksdb_provider();
1294
1295        let address = Address::random();
1296        let key = ShardedKey::new(address, 1000);
1297        let value = IntegerList::new([1, 10, 100, 500]).unwrap();
1298
1299        // Write via RocksDBBatch
1300        let mut batch = provider.batch();
1301        batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1302        batch.commit().unwrap();
1303
1304        // Read via RocksTx
1305        let tx = provider.tx();
1306        let result = tx.get::<tables::AccountsHistory>(key).unwrap();
1307        assert_eq!(result, Some(value));
1308
1309        // Test missing key
1310        let missing_key = ShardedKey::new(Address::random(), 0);
1311        assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
1312    }
1313
1314    #[test]
1315    fn test_rocksdb_batch_delete_transaction_hash_number() {
1316        let (_temp_dir, provider) = create_rocksdb_provider();
1317
1318        let hash = B256::from([1u8; 32]);
1319        let tx_num = 100u64;
1320
1321        // First write
1322        provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1323        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1324
1325        // Delete via RocksDBBatch
1326        let mut batch = provider.batch();
1327        batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
1328        batch.commit().unwrap();
1329
1330        // Verify deletion
1331        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1332    }
1333
1334    #[test]
1335    fn test_rocksdb_batch_delete_storage_history() {
1336        let (_temp_dir, provider) = create_rocksdb_provider();
1337
1338        let address = Address::random();
1339        let storage_key = B256::from([1u8; 32]);
1340        let key = StorageShardedKey::new(address, storage_key, 1000);
1341        let value = IntegerList::new([1, 5, 10]).unwrap();
1342
1343        // First write
1344        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1345        assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
1346
1347        // Delete via RocksDBBatch
1348        let mut batch = provider.batch();
1349        batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
1350        batch.commit().unwrap();
1351
1352        // Verify deletion
1353        assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
1354    }
1355
1356    #[test]
1357    fn test_rocksdb_batch_delete_account_history() {
1358        let (_temp_dir, provider) = create_rocksdb_provider();
1359
1360        let address = Address::random();
1361        let key = ShardedKey::new(address, 1000);
1362        let value = IntegerList::new([1, 10, 100]).unwrap();
1363
1364        // First write
1365        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1366        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
1367
1368        // Delete via RocksDBBatch
1369        let mut batch = provider.batch();
1370        batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
1371        batch.commit().unwrap();
1372
1373        // Verify deletion
1374        assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
1375    }
1376
1377    // ==================== Parametrized Backend Equivalence Tests ====================
1378    //
1379    // These tests verify that MDBX and RocksDB produce identical results for history lookups.
1380    // Each scenario sets up the same data in both backends and asserts identical HistoryInfo.
1381
1382    /// Query parameters for a history lookup test case.
1383    struct HistoryQuery {
1384        block_number: BlockNumber,
1385        lowest_available: Option<BlockNumber>,
1386        expected: HistoryInfo,
1387    }
1388
1389    // Type aliases for cursor types (needed for EitherWriter/EitherReader type inference)
1390    type AccountsHistoryWriteCursor =
1391        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
1392    type StoragesHistoryWriteCursor =
1393        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
1394    type AccountsHistoryReadCursor =
1395        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
1396    type StoragesHistoryReadCursor =
1397        reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
1398
1399    /// Runs the same account history queries against both MDBX and `RocksDB` backends,
1400    /// asserting they produce identical results.
1401    fn run_account_history_scenario(
1402        scenario_name: &str,
1403        address: Address,
1404        shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
1405        queries: &[HistoryQuery],
1406    ) {
1407        // Setup MDBX and RocksDB with identical data using EitherWriter
1408        let factory = create_test_provider_factory();
1409        let mdbx_provider = factory.database_provider_rw().unwrap();
1410        let (temp_dir, rocks_provider) = create_rocksdb_provider();
1411
1412        // Create writers for both backends
1413        let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1414            EitherWriter::Database(
1415                mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
1416            );
1417        let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1418            EitherWriter::RocksDB(rocks_provider.batch());
1419
1420        // Write identical data to both backends in a single loop
1421        for (highest_block, blocks) in shards {
1422            let key = ShardedKey::new(address, *highest_block);
1423            let value = IntegerList::new(blocks.clone()).unwrap();
1424            mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
1425            rocks_writer.upsert_account_history(key, &value).unwrap();
1426        }
1427
1428        // Commit both backends
1429        drop(mdbx_writer);
1430        mdbx_provider.commit().unwrap();
1431        if let EitherWriter::RocksDB(batch) = rocks_writer {
1432            batch.commit().unwrap();
1433        }
1434
1435        // Run queries against both backends using EitherReader
1436        let mdbx_ro = factory.database_provider_ro().unwrap();
1437        let rocks_snapshot = rocks_provider.snapshot();
1438
1439        for (i, query) in queries.iter().enumerate() {
1440            // MDBX query via EitherReader
1441            let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1442                EitherReader::Database(
1443                    mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
1444                    PhantomData,
1445                );
1446            let mdbx_result = mdbx_reader
1447                .account_history_info(address, query.block_number, query.lowest_available, u64::MAX)
1448                .unwrap();
1449
1450            // RocksDB query via EitherReader — reuse snapshot for consistent view
1451            let rocks_result = rocks_snapshot
1452                .account_history_info(address, query.block_number, query.lowest_available, u64::MAX)
1453                .unwrap();
1454
1455            // Assert both backends produce identical results
1456            assert_eq!(
1457                mdbx_result,
1458                rocks_result,
1459                "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1460                 MDBX: {:?}, RocksDB: {:?}",
1461                scenario_name,
1462                i,
1463                query.block_number,
1464                query.lowest_available,
1465                mdbx_result,
1466                rocks_result
1467            );
1468
1469            // Also verify against expected result
1470            assert_eq!(
1471                mdbx_result,
1472                query.expected,
1473                "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1474                 Got: {:?}, Expected: {:?}",
1475                scenario_name,
1476                i,
1477                query.block_number,
1478                query.lowest_available,
1479                mdbx_result,
1480                query.expected
1481            );
1482        }
1483
1484        drop(temp_dir);
1485    }
1486
1487    /// Runs the same storage history queries against both MDBX and `RocksDB` backends,
1488    /// asserting they produce identical results.
1489    fn run_storage_history_scenario(
1490        scenario_name: &str,
1491        address: Address,
1492        storage_key: B256,
1493        shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
1494        queries: &[HistoryQuery],
1495    ) {
1496        // Setup MDBX and RocksDB with identical data using EitherWriter
1497        let factory = create_test_provider_factory();
1498        let mdbx_provider = factory.database_provider_rw().unwrap();
1499        let (temp_dir, rocks_provider) = create_rocksdb_provider();
1500
1501        // Create writers for both backends
1502        let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1503            EitherWriter::Database(
1504                mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
1505            );
1506        let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1507            EitherWriter::RocksDB(rocks_provider.batch());
1508
1509        // Write identical data to both backends in a single loop
1510        for (highest_block, blocks) in shards {
1511            let key = StorageShardedKey::new(address, storage_key, *highest_block);
1512            let value = IntegerList::new(blocks.clone()).unwrap();
1513            mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
1514            rocks_writer.put_storage_history(key, &value).unwrap();
1515        }
1516
1517        // Commit both backends
1518        drop(mdbx_writer);
1519        mdbx_provider.commit().unwrap();
1520        if let EitherWriter::RocksDB(batch) = rocks_writer {
1521            batch.commit().unwrap();
1522        }
1523
1524        // Run queries against both backends using EitherReader
1525        let mdbx_ro = factory.database_provider_ro().unwrap();
1526        let rocks_snapshot = rocks_provider.snapshot();
1527
1528        for (i, query) in queries.iter().enumerate() {
1529            // MDBX query via EitherReader
1530            let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1531                EitherReader::Database(
1532                    mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
1533                    PhantomData,
1534                );
1535            let mdbx_result = mdbx_reader
1536                .storage_history_info(
1537                    address,
1538                    storage_key,
1539                    query.block_number,
1540                    query.lowest_available,
1541                    u64::MAX,
1542                )
1543                .unwrap();
1544
1545            // RocksDB query via snapshot — reuse for consistent view
1546            let rocks_result = rocks_snapshot
1547                .storage_history_info(
1548                    address,
1549                    storage_key,
1550                    query.block_number,
1551                    query.lowest_available,
1552                    u64::MAX,
1553                )
1554                .unwrap();
1555
1556            // Assert both backends produce identical results
1557            assert_eq!(
1558                mdbx_result,
1559                rocks_result,
1560                "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1561                 MDBX: {:?}, RocksDB: {:?}",
1562                scenario_name,
1563                i,
1564                query.block_number,
1565                query.lowest_available,
1566                mdbx_result,
1567                rocks_result
1568            );
1569
1570            // Also verify against expected result
1571            assert_eq!(
1572                mdbx_result,
1573                query.expected,
1574                "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1575                 Got: {:?}, Expected: {:?}",
1576                scenario_name,
1577                i,
1578                query.block_number,
1579                query.lowest_available,
1580                mdbx_result,
1581                query.expected
1582            );
1583        }
1584
1585        drop(temp_dir);
1586    }
1587
1588    /// Tests account history lookups across both MDBX and `RocksDB` backends.
1589    ///
1590    /// Covers the following scenarios from PR2's `RocksDB`-only tests:
1591    /// 1. Single shard - basic lookups within one shard
1592    /// 2. Multiple shards - `prev()` shard detection and transitions
1593    /// 3. No history - query address with no entries
1594    /// 4. Pruning boundary - `lowest_available` boundary behavior (block at/after boundary)
1595    #[test]
1596    fn test_account_history_info_both_backends() {
1597        let address = Address::from([0x42; 20]);
1598
1599        // Scenario 1: Single shard with blocks [100, 200, 300]
1600        run_account_history_scenario(
1601            "single_shard",
1602            address,
1603            &[(u64::MAX, vec![100, 200, 300])],
1604            &[
1605                // Before first entry -> NotYetWritten
1606                HistoryQuery {
1607                    block_number: 50,
1608                    lowest_available: None,
1609                    expected: HistoryInfo::NotYetWritten,
1610                },
1611                // Between entries -> InChangeset(next_write)
1612                HistoryQuery {
1613                    block_number: 150,
1614                    lowest_available: None,
1615                    expected: HistoryInfo::InChangeset(200),
1616                },
1617                // Exact match on entry -> InChangeset(same_block)
1618                HistoryQuery {
1619                    block_number: 300,
1620                    lowest_available: None,
1621                    expected: HistoryInfo::InChangeset(300),
1622                },
1623                // After last entry in last shard -> InPlainState
1624                HistoryQuery {
1625                    block_number: 500,
1626                    lowest_available: None,
1627                    expected: HistoryInfo::InPlainState,
1628                },
1629            ],
1630        );
1631
1632        // Scenario 2: Multiple shards - tests prev() shard detection
1633        run_account_history_scenario(
1634            "multiple_shards",
1635            address,
1636            &[
1637                (500, vec![100, 200, 300, 400, 500]), // First shard ends at 500
1638                (u64::MAX, vec![600, 700, 800]),      // Last shard
1639            ],
1640            &[
1641                // Before first shard, no prev -> NotYetWritten
1642                HistoryQuery {
1643                    block_number: 50,
1644                    lowest_available: None,
1645                    expected: HistoryInfo::NotYetWritten,
1646                },
1647                // Within first shard
1648                HistoryQuery {
1649                    block_number: 150,
1650                    lowest_available: None,
1651                    expected: HistoryInfo::InChangeset(200),
1652                },
1653                // Between shards - prev() should find first shard
1654                HistoryQuery {
1655                    block_number: 550,
1656                    lowest_available: None,
1657                    expected: HistoryInfo::InChangeset(600),
1658                },
1659                // After all entries
1660                HistoryQuery {
1661                    block_number: 900,
1662                    lowest_available: None,
1663                    expected: HistoryInfo::InPlainState,
1664                },
1665            ],
1666        );
1667
1668        // Scenario 3: No history for address
1669        let address_without_history = Address::from([0x43; 20]);
1670        run_account_history_scenario(
1671            "no_history",
1672            address_without_history,
1673            &[], // No shards for this address
1674            &[HistoryQuery {
1675                block_number: 150,
1676                lowest_available: None,
1677                expected: HistoryInfo::NotYetWritten,
1678            }],
1679        );
1680
1681        // Scenario 4: Query at pruning boundary
1682        // Note: We test block >= lowest_available because HistoricalStateProviderRef
1683        // errors on blocks below the pruning boundary before doing the lookup.
1684        // The RocksDB implementation doesn't have this check at the same level.
1685        // This tests that when pruning IS available, both backends agree.
1686        run_account_history_scenario(
1687            "with_pruning_boundary",
1688            address,
1689            &[(u64::MAX, vec![100, 200, 300])],
1690            &[
1691                // At pruning boundary -> InChangeset(first entry after block)
1692                HistoryQuery {
1693                    block_number: 100,
1694                    lowest_available: Some(100),
1695                    expected: HistoryInfo::InChangeset(100),
1696                },
1697                // After pruning boundary, between entries
1698                HistoryQuery {
1699                    block_number: 150,
1700                    lowest_available: Some(100),
1701                    expected: HistoryInfo::InChangeset(200),
1702                },
1703            ],
1704        );
1705    }
1706
1707    /// Tests storage history lookups across both MDBX and `RocksDB` backends.
1708    #[test]
1709    fn test_storage_history_info_both_backends() {
1710        let address = Address::from([0x42; 20]);
1711        let storage_key = B256::from([0x01; 32]);
1712        let other_storage_key = B256::from([0x02; 32]);
1713
1714        // Single shard with blocks [100, 200, 300]
1715        run_storage_history_scenario(
1716            "storage_single_shard",
1717            address,
1718            storage_key,
1719            &[(u64::MAX, vec![100, 200, 300])],
1720            &[
1721                // Before first entry -> NotYetWritten
1722                HistoryQuery {
1723                    block_number: 50,
1724                    lowest_available: None,
1725                    expected: HistoryInfo::NotYetWritten,
1726                },
1727                // Between entries -> InChangeset(next_write)
1728                HistoryQuery {
1729                    block_number: 150,
1730                    lowest_available: None,
1731                    expected: HistoryInfo::InChangeset(200),
1732                },
1733                // After last entry -> InPlainState
1734                HistoryQuery {
1735                    block_number: 500,
1736                    lowest_available: None,
1737                    expected: HistoryInfo::InPlainState,
1738                },
1739            ],
1740        );
1741
1742        // No history for different storage key
1743        run_storage_history_scenario(
1744            "storage_no_history",
1745            address,
1746            other_storage_key,
1747            &[], // No shards for this storage key
1748            &[HistoryQuery {
1749                block_number: 150,
1750                lowest_available: None,
1751                expected: HistoryInfo::NotYetWritten,
1752            }],
1753        );
1754    }
1755
1756    /// Test that `RocksDB` batches created via `EitherWriter` are only made visible when
1757    /// `provider.commit()` is called, not when the writer is dropped.
1758    #[test]
1759    fn test_rocksdb_commits_at_provider_level() {
1760        let factory = create_test_provider_factory();
1761
1762        // Enable RocksDB for transaction hash numbers
1763        factory.set_storage_settings_cache(StorageSettings::v2());
1764
1765        let hash1 = B256::from([1u8; 32]);
1766        let hash2 = B256::from([2u8; 32]);
1767        let tx_num1 = 100u64;
1768        let tx_num2 = 200u64;
1769
1770        // Get the RocksDB batch from the provider
1771        let rocksdb = factory.rocksdb_provider();
1772        let batch = rocksdb.batch();
1773
1774        // Create provider and EitherWriter
1775        let provider = factory.database_provider_rw().unwrap();
1776        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1777
1778        // Write transaction hash numbers (append_only=false since we're using RocksDB)
1779        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1780        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1781
1782        // Extract the raw batch from the writer and register it with the provider
1783        let raw_batch = writer.into_raw_rocksdb_batch();
1784        if let Some(batch) = raw_batch {
1785            provider.set_pending_rocksdb_batch(batch);
1786        }
1787
1788        // Data should NOT be visible yet (batch not committed)
1789        let rocksdb = factory.rocksdb_provider();
1790        assert_eq!(
1791            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1792            None,
1793            "Data should not be visible before provider.commit()"
1794        );
1795
1796        // Commit the provider - this should commit both MDBX and RocksDB
1797        provider.commit().unwrap();
1798
1799        // Now data should be visible in RocksDB
1800        let rocksdb = factory.rocksdb_provider();
1801        assert_eq!(
1802            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1803            Some(tx_num1),
1804            "Data should be visible after provider.commit()"
1805        );
1806        assert_eq!(
1807            rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
1808            Some(tx_num2),
1809            "Data should be visible after provider.commit()"
1810        );
1811    }
1812
1813    /// Test that `EitherReader::new_accounts_history` panics when settings require
1814    /// `RocksDB` but no snapshot is given (`None`). This is an invariant violation that
1815    /// indicates a bug - `with_rocksdb_snapshot` should always provide a snapshot when needed.
1816    #[test]
1817    #[should_panic(expected = "account_history_in_rocksdb requires rocksdb snapshot")]
1818    fn test_settings_mismatch_panics() {
1819        let factory = create_test_provider_factory();
1820
1821        factory.set_storage_settings_cache(StorageSettings::v2());
1822
1823        let provider = factory.database_provider_ro().unwrap();
1824        let _ = EitherReader::<(), ()>::new_accounts_history(&provider, None);
1825    }
1826}