1use std::{marker::PhantomData, ops::Range};
5
6#[cfg(all(unix, feature = "rocksdb"))]
7use crate::providers::rocksdb::RocksTx;
8use crate::{
9 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
10 StaticFileProviderFactory,
11};
12use alloy_primitives::{map::HashMap, Address, BlockNumber, TxNumber};
13use reth_db::{
14 cursor::DbCursorRO,
15 static_file::TransactionSenderMask,
16 table::Value,
17 transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut},
18};
19use reth_db_api::{cursor::DbCursorRW, tables};
20use reth_errors::ProviderError;
21use reth_node_types::NodePrimitives;
22use reth_primitives_traits::ReceiptTy;
23use reth_static_file_types::StaticFileSegment;
24use reth_storage_api::{DBProvider, NodePrimitivesProvider, StorageSettingsCache};
25use reth_storage_errors::provider::ProviderResult;
26use strum::{Display, EnumIs};
27
28type EitherReaderTy<'a, P, T> =
30 EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
31
32type EitherWriterTy<'a, P, T> = EitherWriter<
34 'a,
35 CursorMutTy<<P as DBProvider>::Tx, T>,
36 <P as NodePrimitivesProvider>::Primitives,
37>;
38
39#[cfg(all(unix, feature = "rocksdb"))]
41type RocksTxArg<'a> = crate::providers::rocksdb::RocksTx<'a>;
42#[cfg(not(all(unix, feature = "rocksdb")))]
43type RocksTxArg<'a> = ();
44
45#[cfg(all(unix, feature = "rocksdb"))]
46type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
47#[cfg(not(all(unix, feature = "rocksdb")))]
48type RocksTxRefArg<'a> = ();
49
50#[derive(Debug, Display)]
52pub enum EitherWriter<'a, CURSOR, N> {
53 Database(CURSOR),
55 StaticFile(StaticFileProviderRWRefMut<'a, N>),
57 #[cfg(all(unix, feature = "rocksdb"))]
59 RocksDB(RocksTx<'a>),
60}
61
62impl<'a> EitherWriter<'a, (), ()> {
63 pub fn new_receipts<P>(
65 provider: &'a P,
66 block_number: BlockNumber,
67 ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
68 where
69 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
70 P::Tx: DbTxMut,
71 ReceiptTy<P::Primitives>: Value,
72 {
73 if Self::receipts_destination(provider).is_static_file() {
74 Ok(EitherWriter::StaticFile(
75 provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
76 ))
77 } else {
78 Ok(EitherWriter::Database(
79 provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
80 ))
81 }
82 }
83
84 pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
93 provider: &P,
94 ) -> EitherWriterDestination {
95 let receipts_in_static_files = provider.cached_storage_settings().receipts_in_static_files;
96 let prune_modes = provider.prune_modes_ref();
97
98 if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
99 receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
101 {
102 EitherWriterDestination::Database
103 } else {
104 EitherWriterDestination::StaticFile
105 }
106 }
107
108 pub fn new_senders<P>(
110 provider: &'a P,
111 block_number: BlockNumber,
112 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
113 where
114 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
115 P::Tx: DbTxMut,
116 {
117 if EitherWriterDestination::senders(provider).is_static_file() {
118 Ok(EitherWriter::StaticFile(
119 provider
120 .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
121 ))
122 } else {
123 Ok(EitherWriter::Database(
124 provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
125 ))
126 }
127 }
128
129 pub fn new_storages_history<P>(
131 provider: &P,
132 _rocksdb_tx: RocksTxArg<'a>,
133 ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
134 where
135 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
136 P::Tx: DbTxMut,
137 {
138 #[cfg(all(unix, feature = "rocksdb"))]
139 if provider.cached_storage_settings().storages_history_in_rocksdb {
140 return Ok(EitherWriter::RocksDB(_rocksdb_tx));
141 }
142
143 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
144 }
145
146 pub fn new_transaction_hash_numbers<P>(
148 provider: &P,
149 _rocksdb_tx: RocksTxArg<'a>,
150 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
151 where
152 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
153 P::Tx: DbTxMut,
154 {
155 #[cfg(all(unix, feature = "rocksdb"))]
156 if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
157 return Ok(EitherWriter::RocksDB(_rocksdb_tx));
158 }
159
160 Ok(EitherWriter::Database(
161 provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
162 ))
163 }
164}
165
166impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
167 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
171 match self {
172 Self::Database(_) => Ok(()),
173 Self::StaticFile(writer) => writer.increment_block(expected_block_number),
174 #[cfg(all(unix, feature = "rocksdb"))]
175 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
176 }
177 }
178
179 pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
186 match self {
187 Self::Database(_) => Ok(()),
188 Self::StaticFile(writer) => writer.ensure_at_block(block_number),
189 #[cfg(all(unix, feature = "rocksdb"))]
190 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
191 }
192 }
193
194 #[cfg(all(unix, feature = "rocksdb"))]
199 pub fn commit(self) -> ProviderResult<()> {
200 match self {
201 Self::Database(_) | Self::StaticFile(_) => Ok(()),
202 Self::RocksDB(tx) => tx.commit(),
203 }
204 }
205}
206
207impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
208where
209 N::Receipt: Value,
210 CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
211{
212 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
214 match self {
215 Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
216 Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
217 #[cfg(all(unix, feature = "rocksdb"))]
218 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
219 }
220 }
221}
222
223impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
224where
225 CURSOR: DbCursorRW<tables::TransactionSenders>,
226{
227 pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
229 match self {
230 Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
231 Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
232 #[cfg(all(unix, feature = "rocksdb"))]
233 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
234 }
235 }
236
237 pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
239 where
240 I: Iterator<Item = (TxNumber, Address)>,
241 {
242 match self {
243 Self::Database(cursor) => {
244 for (tx_num, sender) in senders {
245 cursor.append(tx_num, &sender)?;
246 }
247 Ok(())
248 }
249 Self::StaticFile(writer) => writer.append_transaction_senders(senders),
250 #[cfg(all(unix, feature = "rocksdb"))]
251 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
252 }
253 }
254
255 pub fn prune_senders(
258 &mut self,
259 unwind_tx_from: TxNumber,
260 block: BlockNumber,
261 ) -> ProviderResult<()>
262 where
263 CURSOR: DbCursorRO<tables::TransactionSenders>,
264 {
265 match self {
266 Self::Database(cursor) => {
267 let mut walker = cursor.walk_range(unwind_tx_from..)?;
268 while walker.next().transpose()?.is_some() {
269 walker.delete_current()?;
270 }
271 }
272 Self::StaticFile(writer) => {
273 let static_file_transaction_sender_num = writer
274 .reader()
275 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
276
277 let to_delete = static_file_transaction_sender_num
278 .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
279 .unwrap_or_default();
280
281 writer.prune_transaction_senders(to_delete, block)?;
282 }
283 #[cfg(all(unix, feature = "rocksdb"))]
284 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
285 }
286
287 Ok(())
288 }
289}
290
291#[derive(Debug, Display)]
293pub enum EitherReader<'a, CURSOR, N> {
294 Database(CURSOR, PhantomData<&'a ()>),
296 StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
298 #[cfg(all(unix, feature = "rocksdb"))]
300 RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
301}
302
303impl<'a> EitherReader<'a, (), ()> {
304 pub fn new_senders<P>(
306 provider: &P,
307 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
308 where
309 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
310 P::Tx: DbTx,
311 {
312 if EitherWriterDestination::senders(provider).is_static_file() {
313 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
314 } else {
315 Ok(EitherReader::Database(
316 provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
317 PhantomData,
318 ))
319 }
320 }
321
322 pub fn new_storages_history<P>(
324 provider: &P,
325 _rocksdb_tx: RocksTxRefArg<'a>,
326 ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
327 where
328 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
329 P::Tx: DbTx,
330 {
331 #[cfg(all(unix, feature = "rocksdb"))]
332 if provider.cached_storage_settings().storages_history_in_rocksdb {
333 return Ok(EitherReader::RocksDB(_rocksdb_tx));
334 }
335
336 Ok(EitherReader::Database(
337 provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
338 PhantomData,
339 ))
340 }
341
342 pub fn new_transaction_hash_numbers<P>(
344 provider: &P,
345 _rocksdb_tx: RocksTxRefArg<'a>,
346 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
347 where
348 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
349 P::Tx: DbTx,
350 {
351 #[cfg(all(unix, feature = "rocksdb"))]
352 if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
353 return Ok(EitherReader::RocksDB(_rocksdb_tx));
354 }
355
356 Ok(EitherReader::Database(
357 provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
358 PhantomData,
359 ))
360 }
361}
362
363impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
364where
365 CURSOR: DbCursorRO<tables::TransactionSenders>,
366{
367 pub fn senders_by_tx_range(
369 &mut self,
370 range: Range<TxNumber>,
371 ) -> ProviderResult<HashMap<TxNumber, Address>> {
372 match self {
373 Self::Database(cursor, _) => cursor
374 .walk_range(range)?
375 .map(|result| result.map_err(ProviderError::from))
376 .collect::<ProviderResult<HashMap<_, _>>>(),
377 Self::StaticFile(provider, _) => range
378 .clone()
379 .zip(provider.fetch_range_iter(
380 StaticFileSegment::TransactionSenders,
381 range,
382 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
383 )?)
384 .filter_map(|(tx_num, sender)| {
385 let result = sender.transpose()?;
386 Some(result.map(|sender| (tx_num, sender)))
387 })
388 .collect::<ProviderResult<HashMap<_, _>>>(),
389 #[cfg(all(unix, feature = "rocksdb"))]
390 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
391 }
392 }
393}
394
395#[derive(Debug, EnumIs)]
397pub enum EitherWriterDestination {
398 Database,
400 StaticFile,
402 RocksDB,
404}
405
406impl EitherWriterDestination {
407 pub fn senders<P>(provider: &P) -> Self
409 where
410 P: StorageSettingsCache,
411 {
412 if provider.cached_storage_settings().transaction_senders_in_static_files {
414 Self::StaticFile
415 } else {
416 Self::Database
417 }
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use crate::test_utils::create_test_provider_factory;
424
425 use super::*;
426 use alloy_primitives::Address;
427 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
428
429 #[test]
430 fn test_reader_senders_by_tx_range() {
431 let factory = create_test_provider_factory();
432
433 let senders = [
435 (1, Address::random()),
436 (2, Address::random()),
437 (3, Address::random()),
438 (4, Address::random()),
439 ];
440
441 for transaction_senders_in_static_files in [false, true] {
442 factory.set_storage_settings_cache(
443 StorageSettings::legacy()
444 .with_transaction_senders_in_static_files(transaction_senders_in_static_files),
445 );
446
447 let provider = factory.database_provider_rw().unwrap();
448 let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
449 if transaction_senders_in_static_files {
450 assert!(matches!(writer, EitherWriter::StaticFile(_)));
451 } else {
452 assert!(matches!(writer, EitherWriter::Database(_)));
453 }
454
455 writer.increment_block(0).unwrap();
456 writer.append_senders(senders.iter().copied()).unwrap();
457 drop(writer);
458 provider.commit().unwrap();
459
460 let provider = factory.database_provider_ro().unwrap();
461 let mut reader = EitherReader::new_senders(&provider).unwrap();
462 if transaction_senders_in_static_files {
463 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
464 } else {
465 assert!(matches!(reader, EitherReader::Database(_, _)));
466 }
467
468 assert_eq!(
469 reader.senders_by_tx_range(0..6).unwrap(),
470 senders.iter().copied().collect::<HashMap<_, _>>(),
471 "{reader}"
472 );
473 }
474 }
475}