reth_provider/
either_writer.rs

1//! Generic reader and writer abstractions for interacting with either database tables or static
2//! files.
3
4use std::{marker::PhantomData, ops::Range};
5
6#[cfg(all(unix, feature = "rocksdb"))]
7use crate::providers::rocksdb::RocksTx;
8use crate::{
9    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
10    StaticFileProviderFactory,
11};
12use alloy_primitives::{map::HashMap, Address, BlockNumber, TxNumber};
13use reth_db::{
14    cursor::DbCursorRO,
15    static_file::TransactionSenderMask,
16    table::Value,
17    transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut},
18};
19use reth_db_api::{cursor::DbCursorRW, tables};
20use reth_errors::ProviderError;
21use reth_node_types::NodePrimitives;
22use reth_primitives_traits::ReceiptTy;
23use reth_static_file_types::StaticFileSegment;
24use reth_storage_api::{DBProvider, NodePrimitivesProvider, StorageSettingsCache};
25use reth_storage_errors::provider::ProviderResult;
26use strum::{Display, EnumIs};
27
28/// Type alias for [`EitherReader`] constructors.
29type EitherReaderTy<'a, P, T> =
30    EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
31
32/// Type alias for [`EitherWriter`] constructors.
33type EitherWriterTy<'a, P, T> = EitherWriter<
34    'a,
35    CursorMutTy<<P as DBProvider>::Tx, T>,
36    <P as NodePrimitivesProvider>::Primitives,
37>;
38
39// Helper type so constructors stay exported even when RocksDB feature is off.
40#[cfg(all(unix, feature = "rocksdb"))]
41type RocksTxArg<'a> = crate::providers::rocksdb::RocksTx<'a>;
42#[cfg(not(all(unix, feature = "rocksdb")))]
43type RocksTxArg<'a> = ();
44
45#[cfg(all(unix, feature = "rocksdb"))]
46type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
47#[cfg(not(all(unix, feature = "rocksdb")))]
48type RocksTxRefArg<'a> = ();
49
50/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
51#[derive(Debug, Display)]
52pub enum EitherWriter<'a, CURSOR, N> {
53    /// Write to database table via cursor
54    Database(CURSOR),
55    /// Write to static file
56    StaticFile(StaticFileProviderRWRefMut<'a, N>),
57    /// Write to `RocksDB` transaction
58    #[cfg(all(unix, feature = "rocksdb"))]
59    RocksDB(RocksTx<'a>),
60}
61
62impl<'a> EitherWriter<'a, (), ()> {
63    /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes.
64    pub fn new_receipts<P>(
65        provider: &'a P,
66        block_number: BlockNumber,
67    ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
68    where
69        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
70        P::Tx: DbTxMut,
71        ReceiptTy<P::Primitives>: Value,
72    {
73        if Self::receipts_destination(provider).is_static_file() {
74            Ok(EitherWriter::StaticFile(
75                provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
76            ))
77        } else {
78            Ok(EitherWriter::Database(
79                provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
80            ))
81        }
82    }
83
84    /// Returns the destination for writing receipts.
85    ///
86    /// The rules are as follows:
87    /// - If the node should not always write receipts to static files, and any receipt pruning is
88    ///   enabled, write to the database.
89    /// - If the node should always write receipts to static files, but receipt log filter pruning
90    ///   is enabled, write to the database.
91    /// - Otherwise, write to static files.
92    pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
93        provider: &P,
94    ) -> EitherWriterDestination {
95        let receipts_in_static_files = provider.cached_storage_settings().receipts_in_static_files;
96        let prune_modes = provider.prune_modes_ref();
97
98        if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
99            // TODO: support writing receipts to static files with log filter pruning enabled
100            receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
101        {
102            EitherWriterDestination::Database
103        } else {
104            EitherWriterDestination::StaticFile
105        }
106    }
107
108    /// Creates a new [`EitherWriter`] for senders based on storage settings.
109    pub fn new_senders<P>(
110        provider: &'a P,
111        block_number: BlockNumber,
112    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
113    where
114        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
115        P::Tx: DbTxMut,
116    {
117        if EitherWriterDestination::senders(provider).is_static_file() {
118            Ok(EitherWriter::StaticFile(
119                provider
120                    .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
121            ))
122        } else {
123            Ok(EitherWriter::Database(
124                provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
125            ))
126        }
127    }
128
129    /// Creates a new [`EitherWriter`] for storages history based on storage settings.
130    pub fn new_storages_history<P>(
131        provider: &P,
132        _rocksdb_tx: RocksTxArg<'a>,
133    ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
134    where
135        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
136        P::Tx: DbTxMut,
137    {
138        #[cfg(all(unix, feature = "rocksdb"))]
139        if provider.cached_storage_settings().storages_history_in_rocksdb {
140            return Ok(EitherWriter::RocksDB(_rocksdb_tx));
141        }
142
143        Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
144    }
145
146    /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
147    pub fn new_transaction_hash_numbers<P>(
148        provider: &P,
149        _rocksdb_tx: RocksTxArg<'a>,
150    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
151    where
152        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
153        P::Tx: DbTxMut,
154    {
155        #[cfg(all(unix, feature = "rocksdb"))]
156        if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
157            return Ok(EitherWriter::RocksDB(_rocksdb_tx));
158        }
159
160        Ok(EitherWriter::Database(
161            provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
162        ))
163    }
164}
165
166impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
167    /// Increment the block number.
168    ///
169    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
170    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
171        match self {
172            Self::Database(_) => Ok(()),
173            Self::StaticFile(writer) => writer.increment_block(expected_block_number),
174            #[cfg(all(unix, feature = "rocksdb"))]
175            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
176        }
177    }
178
179    /// Ensures that the writer is positioned at the specified block number.
180    ///
181    /// If the writer is positioned at a greater block number than the specified one, the writer
182    /// will NOT be unwound and the error will be returned.
183    ///
184    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
185    pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
186        match self {
187            Self::Database(_) => Ok(()),
188            Self::StaticFile(writer) => writer.ensure_at_block(block_number),
189            #[cfg(all(unix, feature = "rocksdb"))]
190            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
191        }
192    }
193
194    /// Commits the `RocksDB` transaction if this is a `RocksDB` writer.
195    ///
196    /// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
197    /// different commit patterns (MDBX transaction commit, static file sync).
198    #[cfg(all(unix, feature = "rocksdb"))]
199    pub fn commit(self) -> ProviderResult<()> {
200        match self {
201            Self::Database(_) | Self::StaticFile(_) => Ok(()),
202            Self::RocksDB(tx) => tx.commit(),
203        }
204    }
205}
206
207impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
208where
209    N::Receipt: Value,
210    CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
211{
212    /// Append a transaction receipt.
213    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
214        match self {
215            Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
216            Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
217            #[cfg(all(unix, feature = "rocksdb"))]
218            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
219        }
220    }
221}
222
223impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
224where
225    CURSOR: DbCursorRW<tables::TransactionSenders>,
226{
227    /// Append a transaction sender to the destination
228    pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
229        match self {
230            Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
231            Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
232            #[cfg(all(unix, feature = "rocksdb"))]
233            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
234        }
235    }
236
237    /// Append transaction senders to the destination
238    pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
239    where
240        I: Iterator<Item = (TxNumber, Address)>,
241    {
242        match self {
243            Self::Database(cursor) => {
244                for (tx_num, sender) in senders {
245                    cursor.append(tx_num, &sender)?;
246                }
247                Ok(())
248            }
249            Self::StaticFile(writer) => writer.append_transaction_senders(senders),
250            #[cfg(all(unix, feature = "rocksdb"))]
251            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
252        }
253    }
254
255    /// Removes all transaction senders above the given transaction number, and stops at the given
256    /// block number.
257    pub fn prune_senders(
258        &mut self,
259        unwind_tx_from: TxNumber,
260        block: BlockNumber,
261    ) -> ProviderResult<()>
262    where
263        CURSOR: DbCursorRO<tables::TransactionSenders>,
264    {
265        match self {
266            Self::Database(cursor) => {
267                let mut walker = cursor.walk_range(unwind_tx_from..)?;
268                while walker.next().transpose()?.is_some() {
269                    walker.delete_current()?;
270                }
271            }
272            Self::StaticFile(writer) => {
273                let static_file_transaction_sender_num = writer
274                    .reader()
275                    .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
276
277                let to_delete = static_file_transaction_sender_num
278                    .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
279                    .unwrap_or_default();
280
281                writer.prune_transaction_senders(to_delete, block)?;
282            }
283            #[cfg(all(unix, feature = "rocksdb"))]
284            Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
285        }
286
287        Ok(())
288    }
289}
290
291/// Represents a source for reading data, either from database, static files, or `RocksDB`.
292#[derive(Debug, Display)]
293pub enum EitherReader<'a, CURSOR, N> {
294    /// Read from database table via cursor
295    Database(CURSOR, PhantomData<&'a ()>),
296    /// Read from static file
297    StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
298    /// Read from `RocksDB` transaction
299    #[cfg(all(unix, feature = "rocksdb"))]
300    RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
301}
302
303impl<'a> EitherReader<'a, (), ()> {
304    /// Creates a new [`EitherReader`] for senders based on storage settings.
305    pub fn new_senders<P>(
306        provider: &P,
307    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
308    where
309        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
310        P::Tx: DbTx,
311    {
312        if EitherWriterDestination::senders(provider).is_static_file() {
313            Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
314        } else {
315            Ok(EitherReader::Database(
316                provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
317                PhantomData,
318            ))
319        }
320    }
321
322    /// Creates a new [`EitherReader`] for storages history based on storage settings.
323    pub fn new_storages_history<P>(
324        provider: &P,
325        _rocksdb_tx: RocksTxRefArg<'a>,
326    ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
327    where
328        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
329        P::Tx: DbTx,
330    {
331        #[cfg(all(unix, feature = "rocksdb"))]
332        if provider.cached_storage_settings().storages_history_in_rocksdb {
333            return Ok(EitherReader::RocksDB(_rocksdb_tx));
334        }
335
336        Ok(EitherReader::Database(
337            provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
338            PhantomData,
339        ))
340    }
341
342    /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
343    pub fn new_transaction_hash_numbers<P>(
344        provider: &P,
345        _rocksdb_tx: RocksTxRefArg<'a>,
346    ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
347    where
348        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
349        P::Tx: DbTx,
350    {
351        #[cfg(all(unix, feature = "rocksdb"))]
352        if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
353            return Ok(EitherReader::RocksDB(_rocksdb_tx));
354        }
355
356        Ok(EitherReader::Database(
357            provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
358            PhantomData,
359        ))
360    }
361}
362
363impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
364where
365    CURSOR: DbCursorRO<tables::TransactionSenders>,
366{
367    /// Fetches the senders for a range of transactions.
368    pub fn senders_by_tx_range(
369        &mut self,
370        range: Range<TxNumber>,
371    ) -> ProviderResult<HashMap<TxNumber, Address>> {
372        match self {
373            Self::Database(cursor, _) => cursor
374                .walk_range(range)?
375                .map(|result| result.map_err(ProviderError::from))
376                .collect::<ProviderResult<HashMap<_, _>>>(),
377            Self::StaticFile(provider, _) => range
378                .clone()
379                .zip(provider.fetch_range_iter(
380                    StaticFileSegment::TransactionSenders,
381                    range,
382                    |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
383                )?)
384                .filter_map(|(tx_num, sender)| {
385                    let result = sender.transpose()?;
386                    Some(result.map(|sender| (tx_num, sender)))
387                })
388                .collect::<ProviderResult<HashMap<_, _>>>(),
389            #[cfg(all(unix, feature = "rocksdb"))]
390            Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
391        }
392    }
393}
394
395/// Destination for writing data.
396#[derive(Debug, EnumIs)]
397pub enum EitherWriterDestination {
398    /// Write to database table
399    Database,
400    /// Write to static file
401    StaticFile,
402    /// Write to `RocksDB`
403    RocksDB,
404}
405
406impl EitherWriterDestination {
407    /// Returns the destination for writing senders based on storage settings.
408    pub fn senders<P>(provider: &P) -> Self
409    where
410        P: StorageSettingsCache,
411    {
412        // Write senders to static files only if they're explicitly enabled
413        if provider.cached_storage_settings().transaction_senders_in_static_files {
414            Self::StaticFile
415        } else {
416            Self::Database
417        }
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use crate::test_utils::create_test_provider_factory;
424
425    use super::*;
426    use alloy_primitives::Address;
427    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
428
429    #[test]
430    fn test_reader_senders_by_tx_range() {
431        let factory = create_test_provider_factory();
432
433        // Insert senders only from 1 to 4, but we will query from 0 to 5.
434        let senders = [
435            (1, Address::random()),
436            (2, Address::random()),
437            (3, Address::random()),
438            (4, Address::random()),
439        ];
440
441        for transaction_senders_in_static_files in [false, true] {
442            factory.set_storage_settings_cache(
443                StorageSettings::legacy()
444                    .with_transaction_senders_in_static_files(transaction_senders_in_static_files),
445            );
446
447            let provider = factory.database_provider_rw().unwrap();
448            let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
449            if transaction_senders_in_static_files {
450                assert!(matches!(writer, EitherWriter::StaticFile(_)));
451            } else {
452                assert!(matches!(writer, EitherWriter::Database(_)));
453            }
454
455            writer.increment_block(0).unwrap();
456            writer.append_senders(senders.iter().copied()).unwrap();
457            drop(writer);
458            provider.commit().unwrap();
459
460            let provider = factory.database_provider_ro().unwrap();
461            let mut reader = EitherReader::new_senders(&provider).unwrap();
462            if transaction_senders_in_static_files {
463                assert!(matches!(reader, EitherReader::StaticFile(_, _)));
464            } else {
465                assert!(matches!(reader, EitherReader::Database(_, _)));
466            }
467
468            assert_eq!(
469                reader.senders_by_tx_range(0..6).unwrap(),
470                senders.iter().copied().collect::<HashMap<_, _>>(),
471                "{reader}"
472            );
473        }
474    }
475}