reth_provider/
either_writer.rs

1//! Generic reader and writer abstractions for interacting with either database tables or static
2//! files.
3
4use std::{marker::PhantomData, ops::Range};
5
6#[cfg(all(unix, feature = "rocksdb"))]
7use crate::providers::rocksdb::RocksDBBatch;
8use crate::{
9    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
10    StaticFileProviderFactory,
11};
12use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
13use reth_db::{
14    cursor::DbCursorRO,
15    static_file::TransactionSenderMask,
16    table::Value,
17    transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut},
18};
19use reth_db_api::{
20    cursor::DbCursorRW,
21    models::{storage_sharded_key::StorageShardedKey, ShardedKey},
22    tables,
23    tables::BlockNumberList,
24};
25use reth_errors::ProviderError;
26use reth_node_types::NodePrimitives;
27use reth_primitives_traits::ReceiptTy;
28use reth_static_file_types::StaticFileSegment;
29use reth_storage_api::{DBProvider, NodePrimitivesProvider, StorageSettingsCache};
30use reth_storage_errors::provider::ProviderResult;
31use strum::{Display, EnumIs};
32
33/// Type alias for [`EitherReader`] constructors.
34type EitherReaderTy<'a, P, T> =
35    EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
36
37/// Type alias for [`EitherWriter`] constructors.
38type EitherWriterTy<'a, P, T> = EitherWriter<
39    'a,
40    CursorMutTy<<P as DBProvider>::Tx, T>,
41    <P as NodePrimitivesProvider>::Primitives,
42>;
43
44// Helper types so constructors stay exported even when RocksDB feature is off.
45// Historical data tables use a write-only RocksDB batch (no read-your-writes needed).
46#[cfg(all(unix, feature = "rocksdb"))]
47type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
48#[cfg(not(all(unix, feature = "rocksdb")))]
49type RocksBatchArg<'a> = ();
50
51#[cfg(all(unix, feature = "rocksdb"))]
52type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
53#[cfg(not(all(unix, feature = "rocksdb")))]
54type RocksTxRefArg<'a> = ();
55
56/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
57#[derive(Debug, Display)]
58pub enum EitherWriter<'a, CURSOR, N> {
59    /// Write to database table via cursor
60    Database(CURSOR),
61    /// Write to static file
62    StaticFile(StaticFileProviderRWRefMut<'a, N>),
63    /// Write to `RocksDB` using a write-only batch (historical tables).
64    #[cfg(all(unix, feature = "rocksdb"))]
65    RocksDB(RocksDBBatch<'a>),
66}
67
68impl<'a> EitherWriter<'a, (), ()> {
69    /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes.
70    pub fn new_receipts<P>(
71        provider: &'a P,
72        block_number: BlockNumber,
73    ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
74    where
75        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
76        P::Tx: DbTxMut,
77        ReceiptTy<P::Primitives>: Value,
78    {
79        if Self::receipts_destination(provider).is_static_file() {
80            Ok(EitherWriter::StaticFile(
81                provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
82            ))
83        } else {
84            Ok(EitherWriter::Database(
85                provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
86            ))
87        }
88    }
89
90    /// Returns the destination for writing receipts.
91    ///
92    /// The rules are as follows:
93    /// - If the node should not always write receipts to static files, and any receipt pruning is
94    ///   enabled, write to the database.
95    /// - If the node should always write receipts to static files, but receipt log filter pruning
96    ///   is enabled, write to the database.
97    /// - Otherwise, write to static files.
98    pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
99        provider: &P,
100    ) -> EitherWriterDestination {
101        let receipts_in_static_files = provider.cached_storage_settings().receipts_in_static_files;
102        let prune_modes = provider.prune_modes_ref();
103
104        if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
105            // TODO: support writing receipts to static files with log filter pruning enabled
106            receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
107        {
108            EitherWriterDestination::Database
109        } else {
110            EitherWriterDestination::StaticFile
111        }
112    }
113
114    /// Creates a new [`EitherWriter`] for senders based on storage settings.
115    pub fn new_senders<P>(
116        provider: &'a P,
117        block_number: BlockNumber,
118    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
119    where
120        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
121        P::Tx: DbTxMut,
122    {
123        if EitherWriterDestination::senders(provider).is_static_file() {
124            Ok(EitherWriter::StaticFile(
125                provider
126                    .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
127            ))
128        } else {
129            Ok(EitherWriter::Database(
130                provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
131            ))
132        }
133    }
134
135    /// Creates a new [`EitherWriter`] for storages history based on storage settings.
136    pub fn new_storages_history<P>(
137        provider: &P,
138        _rocksdb_batch: RocksBatchArg<'a>,
139    ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
140    where
141        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
142        P::Tx: DbTxMut,
143    {
144        #[cfg(all(unix, feature = "rocksdb"))]
145        if provider.cached_storage_settings().storages_history_in_rocksdb {
146            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
147        }
148
149        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
150    }
151
152    /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
153    pub fn new_transaction_hash_numbers<P>(
154        provider: &P,
155        _rocksdb_batch: RocksBatchArg<'a>,
156    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
157    where
158        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
159        P::Tx: DbTxMut,
160    {
161        #[cfg(all(unix, feature = "rocksdb"))]
162        if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
163            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
164        }
165
166        Ok(EitherWriter::Database(
167            provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
168        ))
169    }
170
171    /// Creates a new [`EitherWriter`] for account history based on storage settings.
172    pub fn new_accounts_history<P>(
173        provider: &P,
174        _rocksdb_batch: RocksBatchArg<'a>,
175    ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
176    where
177        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
178        P::Tx: DbTxMut,
179    {
180        #[cfg(all(unix, feature = "rocksdb"))]
181        if provider.cached_storage_settings().account_history_in_rocksdb {
182            return Ok(EitherWriter::RocksDB(_rocksdb_batch));
183        }
184
185        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
186    }
187}
188
189impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
190    /// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
191    ///
192    /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
193    /// `None` for other variants.
194    ///
195    /// This is used to defer `RocksDB` commits to the provider level, ensuring all
196    /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
197    #[cfg(all(unix, feature = "rocksdb"))]
198    pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
199        match self {
200            Self::Database(_) | Self::StaticFile(_) => None,
201            Self::RocksDB(batch) => Some(batch.into_inner()),
202        }
203    }
204
205    /// Increment the block number.
206    ///
207    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
208    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
209        match self {
210            Self::Database(_) => Ok(()),
211            Self::StaticFile(writer) => writer.increment_block(expected_block_number),
212            #[cfg(all(unix, feature = "rocksdb"))]
213            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
214        }
215    }
216
217    /// Ensures that the writer is positioned at the specified block number.
218    ///
219    /// If the writer is positioned at a greater block number than the specified one, the writer
220    /// will NOT be unwound and the error will be returned.
221    ///
222    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
223    pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
224        match self {
225            Self::Database(_) => Ok(()),
226            Self::StaticFile(writer) => writer.ensure_at_block(block_number),
227            #[cfg(all(unix, feature = "rocksdb"))]
228            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
229        }
230    }
231}
232
233impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
234where
235    N::Receipt: Value,
236    CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
237{
238    /// Append a transaction receipt.
239    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
240        match self {
241            Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
242            Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
243            #[cfg(all(unix, feature = "rocksdb"))]
244            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
245        }
246    }
247}
248
249impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
250where
251    CURSOR: DbCursorRW<tables::TransactionSenders>,
252{
253    /// Append a transaction sender to the destination
254    pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
255        match self {
256            Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
257            Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
258            #[cfg(all(unix, feature = "rocksdb"))]
259            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
260        }
261    }
262
263    /// Append transaction senders to the destination
264    pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
265    where
266        I: Iterator<Item = (TxNumber, Address)>,
267    {
268        match self {
269            Self::Database(cursor) => {
270                for (tx_num, sender) in senders {
271                    cursor.append(tx_num, &sender)?;
272                }
273                Ok(())
274            }
275            Self::StaticFile(writer) => writer.append_transaction_senders(senders),
276            #[cfg(all(unix, feature = "rocksdb"))]
277            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
278        }
279    }
280
281    /// Removes all transaction senders above the given transaction number, and stops at the given
282    /// block number.
283    pub fn prune_senders(
284        &mut self,
285        unwind_tx_from: TxNumber,
286        block: BlockNumber,
287    ) -> ProviderResult<()>
288    where
289        CURSOR: DbCursorRO<tables::TransactionSenders>,
290    {
291        match self {
292            Self::Database(cursor) => {
293                let mut walker = cursor.walk_range(unwind_tx_from..)?;
294                while walker.next().transpose()?.is_some() {
295                    walker.delete_current()?;
296                }
297            }
298            Self::StaticFile(writer) => {
299                let static_file_transaction_sender_num = writer
300                    .reader()
301                    .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
302
303                let to_delete = static_file_transaction_sender_num
304                    .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
305                    .unwrap_or_default();
306
307                writer.prune_transaction_senders(to_delete, block)?;
308            }
309            #[cfg(all(unix, feature = "rocksdb"))]
310            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
311        }
312
313        Ok(())
314    }
315}
316
317impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
318where
319    CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
320{
321    /// Puts a transaction hash number mapping.
322    ///
323    /// When `append_only` is true, uses `cursor.append()` which is significantly faster
324    /// but requires entries to be inserted in order and the table to be empty.
325    /// When false, uses `cursor.insert()` which handles arbitrary insertion order.
326    pub fn put_transaction_hash_number(
327        &mut self,
328        hash: TxHash,
329        tx_num: TxNumber,
330        append_only: bool,
331    ) -> ProviderResult<()> {
332        match self {
333            Self::Database(cursor) => {
334                if append_only {
335                    Ok(cursor.append(hash, &tx_num)?)
336                } else {
337                    Ok(cursor.insert(hash, &tx_num)?)
338                }
339            }
340            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
341            #[cfg(all(unix, feature = "rocksdb"))]
342            Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
343        }
344    }
345
346    /// Deletes a transaction hash number mapping.
347    pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
348        match self {
349            Self::Database(cursor) => {
350                if cursor.seek_exact(hash)?.is_some() {
351                    cursor.delete_current()?;
352                }
353                Ok(())
354            }
355            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
356            #[cfg(all(unix, feature = "rocksdb"))]
357            Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
358        }
359    }
360}
361
362impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
363where
364    CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
365{
366    /// Puts a storage history entry.
367    pub fn put_storage_history(
368        &mut self,
369        key: StorageShardedKey,
370        value: &BlockNumberList,
371    ) -> ProviderResult<()> {
372        match self {
373            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
374            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
375            #[cfg(all(unix, feature = "rocksdb"))]
376            Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
377        }
378    }
379
380    /// Deletes a storage history entry.
381    pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
382        match self {
383            Self::Database(cursor) => {
384                if cursor.seek_exact(key)?.is_some() {
385                    cursor.delete_current()?;
386                }
387                Ok(())
388            }
389            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
390            #[cfg(all(unix, feature = "rocksdb"))]
391            Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
392        }
393    }
394}
395
396impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
397where
398    CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
399{
400    /// Puts an account history entry.
401    pub fn put_account_history(
402        &mut self,
403        key: ShardedKey<Address>,
404        value: &BlockNumberList,
405    ) -> ProviderResult<()> {
406        match self {
407            Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
408            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
409            #[cfg(all(unix, feature = "rocksdb"))]
410            Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
411        }
412    }
413
414    /// Deletes an account history entry.
415    pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
416        match self {
417            Self::Database(cursor) => {
418                if cursor.seek_exact(key)?.is_some() {
419                    cursor.delete_current()?;
420                }
421                Ok(())
422            }
423            Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
424            #[cfg(all(unix, feature = "rocksdb"))]
425            Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
426        }
427    }
428}
429
430/// Represents a source for reading data, either from database, static files, or `RocksDB`.
431#[derive(Debug, Display)]
432pub enum EitherReader<'a, CURSOR, N> {
433    /// Read from database table via cursor
434    Database(CURSOR, PhantomData<&'a ()>),
435    /// Read from static file
436    StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
437    /// Read from `RocksDB` transaction
438    #[cfg(all(unix, feature = "rocksdb"))]
439    RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
440}
441
442impl<'a> EitherReader<'a, (), ()> {
443    /// Creates a new [`EitherReader`] for senders based on storage settings.
444    pub fn new_senders<P>(
445        provider: &P,
446    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
447    where
448        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
449        P::Tx: DbTx,
450    {
451        if EitherWriterDestination::senders(provider).is_static_file() {
452            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
453        } else {
454            Ok(EitherReader::Database(
455                provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
456                PhantomData,
457            ))
458        }
459    }
460
461    /// Creates a new [`EitherReader`] for storages history based on storage settings.
462    pub fn new_storages_history<P>(
463        provider: &P,
464        _rocksdb_tx: RocksTxRefArg<'a>,
465    ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
466    where
467        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
468        P::Tx: DbTx,
469    {
470        #[cfg(all(unix, feature = "rocksdb"))]
471        if provider.cached_storage_settings().storages_history_in_rocksdb {
472            return Ok(EitherReader::RocksDB(_rocksdb_tx));
473        }
474
475        Ok(EitherReader::Database(
476            provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
477            PhantomData,
478        ))
479    }
480
481    /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
482    pub fn new_transaction_hash_numbers<P>(
483        provider: &P,
484        _rocksdb_tx: RocksTxRefArg<'a>,
485    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
486    where
487        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
488        P::Tx: DbTx,
489    {
490        #[cfg(all(unix, feature = "rocksdb"))]
491        if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
492            return Ok(EitherReader::RocksDB(_rocksdb_tx));
493        }
494
495        Ok(EitherReader::Database(
496            provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
497            PhantomData,
498        ))
499    }
500
501    /// Creates a new [`EitherReader`] for account history based on storage settings.
502    pub fn new_accounts_history<P>(
503        provider: &P,
504        _rocksdb_tx: RocksTxRefArg<'a>,
505    ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
506    where
507        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
508        P::Tx: DbTx,
509    {
510        #[cfg(all(unix, feature = "rocksdb"))]
511        if provider.cached_storage_settings().account_history_in_rocksdb {
512            return Ok(EitherReader::RocksDB(_rocksdb_tx));
513        }
514
515        Ok(EitherReader::Database(
516            provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
517            PhantomData,
518        ))
519    }
520}
521
522impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
523where
524    CURSOR: DbCursorRO<tables::TransactionSenders>,
525{
526    /// Fetches the senders for a range of transactions.
527    pub fn senders_by_tx_range(
528        &mut self,
529        range: Range<TxNumber>,
530    ) -> ProviderResult<HashMap<TxNumber, Address>> {
531        match self {
532            Self::Database(cursor, _) => cursor
533                .walk_range(range)?
534                .map(|result| result.map_err(ProviderError::from))
535                .collect::<ProviderResult<HashMap<_, _>>>(),
536            Self::StaticFile(provider, _) => range
537                .clone()
538                .zip(provider.fetch_range_iter(
539                    StaticFileSegment::TransactionSenders,
540                    range,
541                    |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
542                )?)
543                .filter_map(|(tx_num, sender)| {
544                    let result = sender.transpose()?;
545                    Some(result.map(|sender| (tx_num, sender)))
546                })
547                .collect::<ProviderResult<HashMap<_, _>>>(),
548            #[cfg(all(unix, feature = "rocksdb"))]
549            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
550        }
551    }
552}
553
554impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
555where
556    CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
557{
558    /// Gets a transaction number by its hash.
559    pub fn get_transaction_hash_number(
560        &mut self,
561        hash: TxHash,
562    ) -> ProviderResult<Option<TxNumber>> {
563        match self {
564            Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
565            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
566            #[cfg(all(unix, feature = "rocksdb"))]
567            Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
568        }
569    }
570}
571
572impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
573where
574    CURSOR: DbCursorRO<tables::StoragesHistory>,
575{
576    /// Gets a storage history entry.
577    pub fn get_storage_history(
578        &mut self,
579        key: StorageShardedKey,
580    ) -> ProviderResult<Option<BlockNumberList>> {
581        match self {
582            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
583            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
584            #[cfg(all(unix, feature = "rocksdb"))]
585            Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
586        }
587    }
588}
589
590impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
591where
592    CURSOR: DbCursorRO<tables::AccountsHistory>,
593{
594    /// Gets an account history entry.
595    pub fn get_account_history(
596        &mut self,
597        key: ShardedKey<Address>,
598    ) -> ProviderResult<Option<BlockNumberList>> {
599        match self {
600            Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
601            Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
602            #[cfg(all(unix, feature = "rocksdb"))]
603            Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
604        }
605    }
606}
607
608/// Destination for writing data.
609#[derive(Debug, EnumIs)]
610pub enum EitherWriterDestination {
611    /// Write to database table
612    Database,
613    /// Write to static file
614    StaticFile,
615    /// Write to `RocksDB`
616    RocksDB,
617}
618
619impl EitherWriterDestination {
620    /// Returns the destination for writing senders based on storage settings.
621    pub fn senders<P>(provider: &P) -> Self
622    where
623        P: StorageSettingsCache,
624    {
625        // Write senders to static files only if they're explicitly enabled
626        if provider.cached_storage_settings().transaction_senders_in_static_files {
627            Self::StaticFile
628        } else {
629            Self::Database
630        }
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use crate::test_utils::create_test_provider_factory;
637
638    use super::*;
639    use alloy_primitives::Address;
640    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
641
642    #[test]
643    fn test_reader_senders_by_tx_range() {
644        let factory = create_test_provider_factory();
645
646        // Insert senders only from 1 to 4, but we will query from 0 to 5.
647        let senders = [
648            (1, Address::random()),
649            (2, Address::random()),
650            (3, Address::random()),
651            (4, Address::random()),
652        ];
653
654        for transaction_senders_in_static_files in [false, true] {
655            factory.set_storage_settings_cache(
656                StorageSettings::legacy()
657                    .with_transaction_senders_in_static_files(transaction_senders_in_static_files),
658            );
659
660            let provider = factory.database_provider_rw().unwrap();
661            let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
662            if transaction_senders_in_static_files {
663                assert!(matches!(writer, EitherWriter::StaticFile(_)));
664            } else {
665                assert!(matches!(writer, EitherWriter::Database(_)));
666            }
667
668            writer.increment_block(0).unwrap();
669            writer.append_senders(senders.iter().copied()).unwrap();
670            drop(writer);
671            provider.commit().unwrap();
672
673            let provider = factory.database_provider_ro().unwrap();
674            let mut reader = EitherReader::new_senders(&provider).unwrap();
675            if transaction_senders_in_static_files {
676                assert!(matches!(reader, EitherReader::StaticFile(_, _)));
677            } else {
678                assert!(matches!(reader, EitherReader::Database(_, _)));
679            }
680
681            assert_eq!(
682                reader.senders_by_tx_range(0..6).unwrap(),
683                senders.iter().copied().collect::<HashMap<_, _>>(),
684                "{reader}"
685            );
686        }
687    }
688}
689
690#[cfg(all(test, unix, feature = "rocksdb"))]
691mod rocksdb_tests {
692    use super::*;
693    use crate::{
694        providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
695        test_utils::create_test_provider_factory,
696        RocksDBProviderFactory,
697    };
698    use alloy_primitives::{Address, B256};
699    use reth_db_api::{
700        models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
701        tables,
702    };
703    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
704    use tempfile::TempDir;
705
706    fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
707        let temp_dir = TempDir::new().unwrap();
708        let provider = RocksDBBuilder::new(temp_dir.path())
709            .with_table::<tables::TransactionHashNumbers>()
710            .with_table::<tables::StoragesHistory>()
711            .with_table::<tables::AccountsHistory>()
712            .build()
713            .unwrap();
714        (temp_dir, provider)
715    }
716
717    /// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
718    /// when the storage setting is enabled, and that put operations followed by commit
719    /// persist the data to `RocksDB`.
720    #[test]
721    fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
722        let factory = create_test_provider_factory();
723
724        // Enable RocksDB for transaction hash numbers
725        factory.set_storage_settings_cache(
726            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
727        );
728
729        let hash1 = B256::from([1u8; 32]);
730        let hash2 = B256::from([2u8; 32]);
731        let tx_num1 = 100u64;
732        let tx_num2 = 200u64;
733
734        // Get the RocksDB batch from the provider
735        let rocksdb = factory.rocksdb_provider();
736        let batch = rocksdb.batch();
737
738        // Create EitherWriter with RocksDB
739        let provider = factory.database_provider_rw().unwrap();
740        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
741
742        // Verify we got a RocksDB writer
743        assert!(matches!(writer, EitherWriter::RocksDB(_)));
744
745        // Write transaction hash numbers (append_only=false since we're using RocksDB)
746        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
747        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
748
749        // Extract the batch and register with provider for commit
750        if let Some(batch) = writer.into_raw_rocksdb_batch() {
751            provider.set_pending_rocksdb_batch(batch);
752        }
753
754        // Commit via provider - this commits RocksDB batch too
755        provider.commit().unwrap();
756
757        // Verify data was written to RocksDB
758        let rocksdb = factory.rocksdb_provider();
759        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
760        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
761    }
762
763    /// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
764    #[test]
765    fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
766        let factory = create_test_provider_factory();
767
768        // Enable RocksDB for transaction hash numbers
769        factory.set_storage_settings_cache(
770            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
771        );
772
773        let hash = B256::from([1u8; 32]);
774        let tx_num = 100u64;
775
776        // First, write a value directly to RocksDB
777        let rocksdb = factory.rocksdb_provider();
778        rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
779        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
780
781        // Now delete using EitherWriter
782        let batch = rocksdb.batch();
783        let provider = factory.database_provider_rw().unwrap();
784        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
785        writer.delete_transaction_hash_number(hash).unwrap();
786
787        // Extract the batch and commit via provider
788        if let Some(batch) = writer.into_raw_rocksdb_batch() {
789            provider.set_pending_rocksdb_batch(batch);
790        }
791        provider.commit().unwrap();
792
793        // Verify deletion
794        let rocksdb = factory.rocksdb_provider();
795        assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
796    }
797
798    #[test]
799    fn test_rocksdb_batch_transaction_hash_numbers() {
800        let (_temp_dir, provider) = create_rocksdb_provider();
801
802        let hash1 = B256::from([1u8; 32]);
803        let hash2 = B256::from([2u8; 32]);
804        let tx_num1 = 100u64;
805        let tx_num2 = 200u64;
806
807        // Write via RocksDBBatch (same as EitherWriter::RocksDB would use internally)
808        let mut batch = provider.batch();
809        batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
810        batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
811        batch.commit().unwrap();
812
813        // Read via RocksTx (same as EitherReader::RocksDB would use internally)
814        let tx = provider.tx();
815        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
816        assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
817
818        // Test missing key
819        let missing_hash = B256::from([99u8; 32]);
820        assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
821    }
822
823    #[test]
824    fn test_rocksdb_batch_storage_history() {
825        let (_temp_dir, provider) = create_rocksdb_provider();
826
827        let address = Address::random();
828        let storage_key = B256::from([1u8; 32]);
829        let key = StorageShardedKey::new(address, storage_key, 1000);
830        let value = IntegerList::new([1, 5, 10, 50]).unwrap();
831
832        // Write via RocksDBBatch
833        let mut batch = provider.batch();
834        batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
835        batch.commit().unwrap();
836
837        // Read via RocksTx
838        let tx = provider.tx();
839        let result = tx.get::<tables::StoragesHistory>(key).unwrap();
840        assert_eq!(result, Some(value));
841
842        // Test missing key
843        let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
844        assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
845    }
846
847    #[test]
848    fn test_rocksdb_batch_account_history() {
849        let (_temp_dir, provider) = create_rocksdb_provider();
850
851        let address = Address::random();
852        let key = ShardedKey::new(address, 1000);
853        let value = IntegerList::new([1, 10, 100, 500]).unwrap();
854
855        // Write via RocksDBBatch
856        let mut batch = provider.batch();
857        batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
858        batch.commit().unwrap();
859
860        // Read via RocksTx
861        let tx = provider.tx();
862        let result = tx.get::<tables::AccountsHistory>(key).unwrap();
863        assert_eq!(result, Some(value));
864
865        // Test missing key
866        let missing_key = ShardedKey::new(Address::random(), 0);
867        assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
868    }
869
870    #[test]
871    fn test_rocksdb_batch_delete_transaction_hash_number() {
872        let (_temp_dir, provider) = create_rocksdb_provider();
873
874        let hash = B256::from([1u8; 32]);
875        let tx_num = 100u64;
876
877        // First write
878        provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
879        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
880
881        // Delete via RocksDBBatch
882        let mut batch = provider.batch();
883        batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
884        batch.commit().unwrap();
885
886        // Verify deletion
887        assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
888    }
889
890    #[test]
891    fn test_rocksdb_batch_delete_storage_history() {
892        let (_temp_dir, provider) = create_rocksdb_provider();
893
894        let address = Address::random();
895        let storage_key = B256::from([1u8; 32]);
896        let key = StorageShardedKey::new(address, storage_key, 1000);
897        let value = IntegerList::new([1, 5, 10]).unwrap();
898
899        // First write
900        provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
901        assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
902
903        // Delete via RocksDBBatch
904        let mut batch = provider.batch();
905        batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
906        batch.commit().unwrap();
907
908        // Verify deletion
909        assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
910    }
911
912    #[test]
913    fn test_rocksdb_batch_delete_account_history() {
914        let (_temp_dir, provider) = create_rocksdb_provider();
915
916        let address = Address::random();
917        let key = ShardedKey::new(address, 1000);
918        let value = IntegerList::new([1, 10, 100]).unwrap();
919
920        // First write
921        provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
922        assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
923
924        // Delete via RocksDBBatch
925        let mut batch = provider.batch();
926        batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
927        batch.commit().unwrap();
928
929        // Verify deletion
930        assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
931    }
932
933    /// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
934    ///
935    /// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
936    /// in a single place, making it easier to reason about commit ordering and consistency.
937    #[test]
938    fn test_rocksdb_commits_at_provider_level() {
939        let factory = create_test_provider_factory();
940
941        // Enable RocksDB for transaction hash numbers
942        factory.set_storage_settings_cache(
943            StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
944        );
945
946        let hash1 = B256::from([1u8; 32]);
947        let hash2 = B256::from([2u8; 32]);
948        let tx_num1 = 100u64;
949        let tx_num2 = 200u64;
950
951        // Get the RocksDB batch from the provider
952        let rocksdb = factory.rocksdb_provider();
953        let batch = rocksdb.batch();
954
955        // Create provider and EitherWriter
956        let provider = factory.database_provider_rw().unwrap();
957        let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
958
959        // Write transaction hash numbers (append_only=false since we're using RocksDB)
960        writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
961        writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
962
963        // Extract the raw batch from the writer and register it with the provider
964        let raw_batch = writer.into_raw_rocksdb_batch();
965        if let Some(batch) = raw_batch {
966            provider.set_pending_rocksdb_batch(batch);
967        }
968
969        // Data should NOT be visible yet (batch not committed)
970        let rocksdb = factory.rocksdb_provider();
971        assert_eq!(
972            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
973            None,
974            "Data should not be visible before provider.commit()"
975        );
976
977        // Commit the provider - this should commit both MDBX and RocksDB
978        provider.commit().unwrap();
979
980        // Now data should be visible in RocksDB
981        let rocksdb = factory.rocksdb_provider();
982        assert_eq!(
983            rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
984            Some(tx_num1),
985            "Data should be visible after provider.commit()"
986        );
987        assert_eq!(
988            rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
989            Some(tx_num2),
990            "Data should be visible after provider.commit()"
991        );
992    }
993}