1use std::ops::Range;
5
6use crate::{
7 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
8 StaticFileProviderFactory,
9};
10use alloy_primitives::{map::HashMap, Address, BlockNumber, TxNumber};
11use reth_db::{
12 cursor::DbCursorRO,
13 static_file::TransactionSenderMask,
14 table::Value,
15 transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut},
16};
17use reth_db_api::{cursor::DbCursorRW, tables};
18use reth_errors::ProviderError;
19use reth_node_types::NodePrimitives;
20use reth_primitives_traits::ReceiptTy;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{DBProvider, NodePrimitivesProvider, StorageSettingsCache};
23use reth_storage_errors::provider::ProviderResult;
24use strum::{Display, EnumIs};
25
26type EitherReaderTy<P, T> =
28 EitherReader<CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
29
30type EitherWriterTy<'a, P, T> = EitherWriter<
32 'a,
33 CursorMutTy<<P as DBProvider>::Tx, T>,
34 <P as NodePrimitivesProvider>::Primitives,
35>;
36
37#[derive(Debug, Display)]
39pub enum EitherWriter<'a, CURSOR, N> {
40 Database(CURSOR),
42 StaticFile(StaticFileProviderRWRefMut<'a, N>),
44}
45
46impl<'a> EitherWriter<'a, (), ()> {
47 pub fn new_receipts<P>(
49 provider: &'a P,
50 block_number: BlockNumber,
51 ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
52 where
53 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
54 P::Tx: DbTxMut,
55 ReceiptTy<P::Primitives>: Value,
56 {
57 if Self::receipts_destination(provider).is_static_file() {
58 Ok(EitherWriter::StaticFile(
59 provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
60 ))
61 } else {
62 Ok(EitherWriter::Database(
63 provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
64 ))
65 }
66 }
67
68 pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
77 provider: &P,
78 ) -> EitherWriterDestination {
79 let receipts_in_static_files = provider.cached_storage_settings().receipts_in_static_files;
80 let prune_modes = provider.prune_modes_ref();
81
82 if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
83 receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
85 {
86 EitherWriterDestination::Database
87 } else {
88 EitherWriterDestination::StaticFile
89 }
90 }
91
92 pub fn new_senders<P>(
94 provider: &'a P,
95 block_number: BlockNumber,
96 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
97 where
98 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
99 P::Tx: DbTxMut,
100 {
101 if EitherWriterDestination::senders(provider).is_static_file() {
102 Ok(EitherWriter::StaticFile(
103 provider
104 .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
105 ))
106 } else {
107 Ok(EitherWriter::Database(
108 provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
109 ))
110 }
111 }
112}
113
114impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
115 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
119 match self {
120 Self::Database(_) => Ok(()),
121 Self::StaticFile(writer) => writer.increment_block(expected_block_number),
122 }
123 }
124
125 pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
132 match self {
133 Self::Database(_) => Ok(()),
134 Self::StaticFile(writer) => writer.ensure_at_block(block_number),
135 }
136 }
137}
138
139impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
140where
141 N::Receipt: Value,
142 CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
143{
144 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
146 match self {
147 Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
148 Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
149 }
150 }
151}
152
153impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
154where
155 CURSOR: DbCursorRW<tables::TransactionSenders>,
156{
157 pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
159 match self {
160 Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
161 Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
162 }
163 }
164
165 pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
167 where
168 I: Iterator<Item = (TxNumber, Address)>,
169 {
170 match self {
171 Self::Database(cursor) => {
172 for (tx_num, sender) in senders {
173 cursor.append(tx_num, &sender)?;
174 }
175 Ok(())
176 }
177 Self::StaticFile(writer) => writer.append_transaction_senders(senders),
178 }
179 }
180
181 pub fn prune_senders(
184 &mut self,
185 unwind_tx_from: TxNumber,
186 block: BlockNumber,
187 ) -> ProviderResult<()>
188 where
189 CURSOR: DbCursorRO<tables::TransactionSenders>,
190 {
191 match self {
192 Self::Database(cursor) => {
193 let mut walker = cursor.walk_range(unwind_tx_from..)?;
194 while walker.next().transpose()?.is_some() {
195 walker.delete_current()?;
196 }
197 }
198 Self::StaticFile(writer) => {
199 let static_file_transaction_sender_num = writer
200 .reader()
201 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
202
203 let to_delete = static_file_transaction_sender_num
204 .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
205 .unwrap_or_default();
206
207 writer.prune_transaction_senders(to_delete, block)?;
208 }
209 }
210
211 Ok(())
212 }
213}
214
215#[derive(Debug, Display)]
217pub enum EitherReader<CURSOR, N> {
218 Database(CURSOR),
220 StaticFile(StaticFileProvider<N>),
222}
223
224impl EitherReader<(), ()> {
225 pub fn new_senders<P>(
227 provider: &P,
228 ) -> ProviderResult<EitherReaderTy<P, tables::TransactionSenders>>
229 where
230 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
231 P::Tx: DbTx,
232 {
233 if EitherWriterDestination::senders(provider).is_static_file() {
234 Ok(EitherReader::StaticFile(provider.static_file_provider()))
235 } else {
236 Ok(EitherReader::Database(
237 provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
238 ))
239 }
240 }
241}
242
243impl<CURSOR, N: NodePrimitives> EitherReader<CURSOR, N>
244where
245 CURSOR: DbCursorRO<tables::TransactionSenders>,
246{
247 pub fn senders_by_tx_range(
249 &mut self,
250 range: Range<TxNumber>,
251 ) -> ProviderResult<HashMap<TxNumber, Address>> {
252 match self {
253 Self::Database(cursor) => cursor
254 .walk_range(range)?
255 .map(|result| result.map_err(ProviderError::from))
256 .collect::<ProviderResult<HashMap<_, _>>>(),
257 Self::StaticFile(provider) => range
258 .clone()
259 .zip(provider.fetch_range_iter(
260 StaticFileSegment::TransactionSenders,
261 range,
262 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
263 )?)
264 .filter_map(|(tx_num, sender)| {
265 let result = sender.transpose()?;
266 Some(result.map(|sender| (tx_num, sender)))
267 })
268 .collect::<ProviderResult<HashMap<_, _>>>(),
269 }
270 }
271}
272
273#[derive(Debug, EnumIs)]
275pub enum EitherWriterDestination {
276 Database,
278 StaticFile,
280}
281
282impl EitherWriterDestination {
283 pub fn senders<P>(provider: &P) -> Self
285 where
286 P: StorageSettingsCache,
287 {
288 if provider.cached_storage_settings().transaction_senders_in_static_files {
290 Self::StaticFile
291 } else {
292 Self::Database
293 }
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use crate::test_utils::create_test_provider_factory;
300
301 use super::*;
302 use alloy_primitives::Address;
303 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
304
305 #[test]
306 fn test_reader_senders_by_tx_range() {
307 let factory = create_test_provider_factory();
308
309 let senders = [
311 (1, Address::random()),
312 (2, Address::random()),
313 (3, Address::random()),
314 (4, Address::random()),
315 ];
316
317 for transaction_senders_in_static_files in [false, true] {
318 factory.set_storage_settings_cache(
319 StorageSettings::legacy()
320 .with_transaction_senders_in_static_files(transaction_senders_in_static_files),
321 );
322
323 let provider = factory.database_provider_rw().unwrap();
324 let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
325 if transaction_senders_in_static_files {
326 assert!(matches!(writer, EitherWriter::StaticFile(_)));
327 } else {
328 assert!(matches!(writer, EitherWriter::Database(_)));
329 }
330
331 writer.increment_block(0).unwrap();
332 writer.append_senders(senders.iter().copied()).unwrap();
333 drop(writer);
334 provider.commit().unwrap();
335
336 let provider = factory.database_provider_ro().unwrap();
337 let mut reader = EitherReader::new_senders(&provider).unwrap();
338 if transaction_senders_in_static_files {
339 assert!(matches!(reader, EitherReader::StaticFile(_)));
340 } else {
341 assert!(matches!(reader, EitherReader::Database(_)));
342 }
343
344 assert_eq!(
345 reader.senders_by_tx_range(0..6).unwrap(),
346 senders.iter().copied().collect::<HashMap<_, _>>(),
347 "{reader}"
348 );
349 }
350 }
351}