reth_provider/
either_writer.rs

1//! Generic reader and writer abstractions for interacting with either database tables or static
2//! files.
3
4use std::ops::Range;
5
6use crate::{
7    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
8    StaticFileProviderFactory,
9};
10use alloy_primitives::{map::HashMap, Address, BlockNumber, TxNumber};
11use reth_db::{
12    cursor::DbCursorRO,
13    static_file::TransactionSenderMask,
14    table::Value,
15    transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut},
16};
17use reth_db_api::{cursor::DbCursorRW, tables};
18use reth_errors::ProviderError;
19use reth_node_types::NodePrimitives;
20use reth_primitives_traits::ReceiptTy;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{DBProvider, NodePrimitivesProvider, StorageSettingsCache};
23use reth_storage_errors::provider::ProviderResult;
24use strum::{Display, EnumIs};
25
26/// Type alias for [`EitherReader`] constructors.
27type EitherReaderTy<P, T> =
28    EitherReader<CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
29
30/// Type alias for [`EitherWriter`] constructors.
31type EitherWriterTy<'a, P, T> = EitherWriter<
32    'a,
33    CursorMutTy<<P as DBProvider>::Tx, T>,
34    <P as NodePrimitivesProvider>::Primitives,
35>;
36
37/// Represents a destination for writing data, either to database or static files.
38#[derive(Debug, Display)]
39pub enum EitherWriter<'a, CURSOR, N> {
40    /// Write to database table via cursor
41    Database(CURSOR),
42    /// Write to static file
43    StaticFile(StaticFileProviderRWRefMut<'a, N>),
44}
45
46impl<'a> EitherWriter<'a, (), ()> {
47    /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes.
48    pub fn new_receipts<P>(
49        provider: &'a P,
50        block_number: BlockNumber,
51    ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
52    where
53        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
54        P::Tx: DbTxMut,
55        ReceiptTy<P::Primitives>: Value,
56    {
57        if Self::receipts_destination(provider).is_static_file() {
58            Ok(EitherWriter::StaticFile(
59                provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
60            ))
61        } else {
62            Ok(EitherWriter::Database(
63                provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
64            ))
65        }
66    }
67
68    /// Returns the destination for writing receipts.
69    ///
70    /// The rules are as follows:
71    /// - If the node should not always write receipts to static files, and any receipt pruning is
72    ///   enabled, write to the database.
73    /// - If the node should always write receipts to static files, but receipt log filter pruning
74    ///   is enabled, write to the database.
75    /// - Otherwise, write to static files.
76    pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
77        provider: &P,
78    ) -> EitherWriterDestination {
79        let receipts_in_static_files = provider.cached_storage_settings().receipts_in_static_files;
80        let prune_modes = provider.prune_modes_ref();
81
82        if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
83            // TODO: support writing receipts to static files with log filter pruning enabled
84            receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
85        {
86            EitherWriterDestination::Database
87        } else {
88            EitherWriterDestination::StaticFile
89        }
90    }
91
92    /// Creates a new [`EitherWriter`] for senders based on storage settings.
93    pub fn new_senders<P>(
94        provider: &'a P,
95        block_number: BlockNumber,
96    ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
97    where
98        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
99        P::Tx: DbTxMut,
100    {
101        if EitherWriterDestination::senders(provider).is_static_file() {
102            Ok(EitherWriter::StaticFile(
103                provider
104                    .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
105            ))
106        } else {
107            Ok(EitherWriter::Database(
108                provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
109            ))
110        }
111    }
112}
113
114impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
115    /// Increment the block number.
116    ///
117    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
118    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
119        match self {
120            Self::Database(_) => Ok(()),
121            Self::StaticFile(writer) => writer.increment_block(expected_block_number),
122        }
123    }
124
125    /// Ensures that the writer is positioned at the specified block number.
126    ///
127    /// If the writer is positioned at a greater block number than the specified one, the writer
128    /// will NOT be unwound and the error will be returned.
129    ///
130    /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
131    pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
132        match self {
133            Self::Database(_) => Ok(()),
134            Self::StaticFile(writer) => writer.ensure_at_block(block_number),
135        }
136    }
137}
138
139impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
140where
141    N::Receipt: Value,
142    CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
143{
144    /// Append a transaction receipt.
145    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
146        match self {
147            Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
148            Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
149        }
150    }
151}
152
153impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
154where
155    CURSOR: DbCursorRW<tables::TransactionSenders>,
156{
157    /// Append a transaction sender to the destination
158    pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
159        match self {
160            Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
161            Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
162        }
163    }
164
165    /// Append transaction senders to the destination
166    pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
167    where
168        I: Iterator<Item = (TxNumber, Address)>,
169    {
170        match self {
171            Self::Database(cursor) => {
172                for (tx_num, sender) in senders {
173                    cursor.append(tx_num, &sender)?;
174                }
175                Ok(())
176            }
177            Self::StaticFile(writer) => writer.append_transaction_senders(senders),
178        }
179    }
180
181    /// Removes all transaction senders above the given transaction number, and stops at the given
182    /// block number.
183    pub fn prune_senders(
184        &mut self,
185        unwind_tx_from: TxNumber,
186        block: BlockNumber,
187    ) -> ProviderResult<()>
188    where
189        CURSOR: DbCursorRO<tables::TransactionSenders>,
190    {
191        match self {
192            Self::Database(cursor) => {
193                let mut walker = cursor.walk_range(unwind_tx_from..)?;
194                while walker.next().transpose()?.is_some() {
195                    walker.delete_current()?;
196                }
197            }
198            Self::StaticFile(writer) => {
199                let static_file_transaction_sender_num = writer
200                    .reader()
201                    .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
202
203                let to_delete = static_file_transaction_sender_num
204                    .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
205                    .unwrap_or_default();
206
207                writer.prune_transaction_senders(to_delete, block)?;
208            }
209        }
210
211        Ok(())
212    }
213}
214
215/// Represents a source for reading data, either from database or static files.
216#[derive(Debug, Display)]
217pub enum EitherReader<CURSOR, N> {
218    /// Read from database table via cursor
219    Database(CURSOR),
220    /// Read from static file
221    StaticFile(StaticFileProvider<N>),
222}
223
224impl EitherReader<(), ()> {
225    /// Creates a new [`EitherReader`] for senders based on storage settings.
226    pub fn new_senders<P>(
227        provider: &P,
228    ) -> ProviderResult<EitherReaderTy<P, tables::TransactionSenders>>
229    where
230        P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
231        P::Tx: DbTx,
232    {
233        if EitherWriterDestination::senders(provider).is_static_file() {
234            Ok(EitherReader::StaticFile(provider.static_file_provider()))
235        } else {
236            Ok(EitherReader::Database(
237                provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
238            ))
239        }
240    }
241}
242
243impl<CURSOR, N: NodePrimitives> EitherReader<CURSOR, N>
244where
245    CURSOR: DbCursorRO<tables::TransactionSenders>,
246{
247    /// Fetches the senders for a range of transactions.
248    pub fn senders_by_tx_range(
249        &mut self,
250        range: Range<TxNumber>,
251    ) -> ProviderResult<HashMap<TxNumber, Address>> {
252        match self {
253            Self::Database(cursor) => cursor
254                .walk_range(range)?
255                .map(|result| result.map_err(ProviderError::from))
256                .collect::<ProviderResult<HashMap<_, _>>>(),
257            Self::StaticFile(provider) => range
258                .clone()
259                .zip(provider.fetch_range_iter(
260                    StaticFileSegment::TransactionSenders,
261                    range,
262                    |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
263                )?)
264                .filter_map(|(tx_num, sender)| {
265                    let result = sender.transpose()?;
266                    Some(result.map(|sender| (tx_num, sender)))
267                })
268                .collect::<ProviderResult<HashMap<_, _>>>(),
269        }
270    }
271}
272
273/// Destination for writing data.
274#[derive(Debug, EnumIs)]
275pub enum EitherWriterDestination {
276    /// Write to database table
277    Database,
278    /// Write to static file
279    StaticFile,
280}
281
282impl EitherWriterDestination {
283    /// Returns the destination for writing senders based on storage settings.
284    pub fn senders<P>(provider: &P) -> Self
285    where
286        P: StorageSettingsCache,
287    {
288        // Write senders to static files only if they're explicitly enabled
289        if provider.cached_storage_settings().transaction_senders_in_static_files {
290            Self::StaticFile
291        } else {
292            Self::Database
293        }
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use crate::test_utils::create_test_provider_factory;
300
301    use super::*;
302    use alloy_primitives::Address;
303    use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
304
305    #[test]
306    fn test_reader_senders_by_tx_range() {
307        let factory = create_test_provider_factory();
308
309        // Insert senders only from 1 to 4, but we will query from 0 to 5.
310        let senders = [
311            (1, Address::random()),
312            (2, Address::random()),
313            (3, Address::random()),
314            (4, Address::random()),
315        ];
316
317        for transaction_senders_in_static_files in [false, true] {
318            factory.set_storage_settings_cache(
319                StorageSettings::legacy()
320                    .with_transaction_senders_in_static_files(transaction_senders_in_static_files),
321            );
322
323            let provider = factory.database_provider_rw().unwrap();
324            let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
325            if transaction_senders_in_static_files {
326                assert!(matches!(writer, EitherWriter::StaticFile(_)));
327            } else {
328                assert!(matches!(writer, EitherWriter::Database(_)));
329            }
330
331            writer.increment_block(0).unwrap();
332            writer.append_senders(senders.iter().copied()).unwrap();
333            drop(writer);
334            provider.commit().unwrap();
335
336            let provider = factory.database_provider_ro().unwrap();
337            let mut reader = EitherReader::new_senders(&provider).unwrap();
338            if transaction_senders_in_static_files {
339                assert!(matches!(reader, EitherReader::StaticFile(_)));
340            } else {
341                assert!(matches!(reader, EitherReader::Database(_)));
342            }
343
344            assert_eq!(
345                reader.senders_by_tx_range(0..6).unwrap(),
346                senders.iter().copied().collect::<HashMap<_, _>>(),
347                "{reader}"
348            );
349        }
350    }
351}