1use std::{marker::PhantomData, ops::Range};
5
6#[cfg(all(unix, feature = "rocksdb"))]
7use crate::providers::rocksdb::RocksDBBatch;
8use crate::{
9 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
10 StaticFileProviderFactory,
11};
12use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
13use reth_db::{
14 cursor::DbCursorRO,
15 static_file::TransactionSenderMask,
16 table::Value,
17 transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut},
18};
19use reth_db_api::{
20 cursor::DbCursorRW,
21 models::{storage_sharded_key::StorageShardedKey, ShardedKey},
22 tables,
23 tables::BlockNumberList,
24};
25use reth_errors::ProviderError;
26use reth_node_types::NodePrimitives;
27use reth_primitives_traits::ReceiptTy;
28use reth_static_file_types::StaticFileSegment;
29use reth_storage_api::{DBProvider, NodePrimitivesProvider, StorageSettingsCache};
30use reth_storage_errors::provider::ProviderResult;
31use strum::{Display, EnumIs};
32
33type EitherReaderTy<'a, P, T> =
35 EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
36
37type EitherWriterTy<'a, P, T> = EitherWriter<
39 'a,
40 CursorMutTy<<P as DBProvider>::Tx, T>,
41 <P as NodePrimitivesProvider>::Primitives,
42>;
43
44#[cfg(all(unix, feature = "rocksdb"))]
47type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
48#[cfg(not(all(unix, feature = "rocksdb")))]
49type RocksBatchArg<'a> = ();
50
51#[cfg(all(unix, feature = "rocksdb"))]
52type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
53#[cfg(not(all(unix, feature = "rocksdb")))]
54type RocksTxRefArg<'a> = ();
55
56#[derive(Debug, Display)]
58pub enum EitherWriter<'a, CURSOR, N> {
59 Database(CURSOR),
61 StaticFile(StaticFileProviderRWRefMut<'a, N>),
63 #[cfg(all(unix, feature = "rocksdb"))]
65 RocksDB(RocksDBBatch<'a>),
66}
67
68impl<'a> EitherWriter<'a, (), ()> {
69 pub fn new_receipts<P>(
71 provider: &'a P,
72 block_number: BlockNumber,
73 ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
74 where
75 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
76 P::Tx: DbTxMut,
77 ReceiptTy<P::Primitives>: Value,
78 {
79 if Self::receipts_destination(provider).is_static_file() {
80 Ok(EitherWriter::StaticFile(
81 provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
82 ))
83 } else {
84 Ok(EitherWriter::Database(
85 provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
86 ))
87 }
88 }
89
90 pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
99 provider: &P,
100 ) -> EitherWriterDestination {
101 let receipts_in_static_files = provider.cached_storage_settings().receipts_in_static_files;
102 let prune_modes = provider.prune_modes_ref();
103
104 if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
105 receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
107 {
108 EitherWriterDestination::Database
109 } else {
110 EitherWriterDestination::StaticFile
111 }
112 }
113
114 pub fn new_senders<P>(
116 provider: &'a P,
117 block_number: BlockNumber,
118 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
119 where
120 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
121 P::Tx: DbTxMut,
122 {
123 if EitherWriterDestination::senders(provider).is_static_file() {
124 Ok(EitherWriter::StaticFile(
125 provider
126 .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
127 ))
128 } else {
129 Ok(EitherWriter::Database(
130 provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
131 ))
132 }
133 }
134
135 pub fn new_storages_history<P>(
137 provider: &P,
138 _rocksdb_batch: RocksBatchArg<'a>,
139 ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
140 where
141 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
142 P::Tx: DbTxMut,
143 {
144 #[cfg(all(unix, feature = "rocksdb"))]
145 if provider.cached_storage_settings().storages_history_in_rocksdb {
146 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
147 }
148
149 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
150 }
151
152 pub fn new_transaction_hash_numbers<P>(
154 provider: &P,
155 _rocksdb_batch: RocksBatchArg<'a>,
156 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
157 where
158 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
159 P::Tx: DbTxMut,
160 {
161 #[cfg(all(unix, feature = "rocksdb"))]
162 if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
163 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
164 }
165
166 Ok(EitherWriter::Database(
167 provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
168 ))
169 }
170
171 pub fn new_accounts_history<P>(
173 provider: &P,
174 _rocksdb_batch: RocksBatchArg<'a>,
175 ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
176 where
177 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
178 P::Tx: DbTxMut,
179 {
180 #[cfg(all(unix, feature = "rocksdb"))]
181 if provider.cached_storage_settings().account_history_in_rocksdb {
182 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
183 }
184
185 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
186 }
187}
188
189impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
190 #[cfg(all(unix, feature = "rocksdb"))]
198 pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
199 match self {
200 Self::Database(_) | Self::StaticFile(_) => None,
201 Self::RocksDB(batch) => Some(batch.into_inner()),
202 }
203 }
204
205 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
209 match self {
210 Self::Database(_) => Ok(()),
211 Self::StaticFile(writer) => writer.increment_block(expected_block_number),
212 #[cfg(all(unix, feature = "rocksdb"))]
213 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
214 }
215 }
216
217 pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
224 match self {
225 Self::Database(_) => Ok(()),
226 Self::StaticFile(writer) => writer.ensure_at_block(block_number),
227 #[cfg(all(unix, feature = "rocksdb"))]
228 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
229 }
230 }
231}
232
233impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
234where
235 N::Receipt: Value,
236 CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
237{
238 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
240 match self {
241 Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
242 Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
243 #[cfg(all(unix, feature = "rocksdb"))]
244 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
245 }
246 }
247}
248
249impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
250where
251 CURSOR: DbCursorRW<tables::TransactionSenders>,
252{
253 pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
255 match self {
256 Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
257 Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
258 #[cfg(all(unix, feature = "rocksdb"))]
259 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
260 }
261 }
262
263 pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
265 where
266 I: Iterator<Item = (TxNumber, Address)>,
267 {
268 match self {
269 Self::Database(cursor) => {
270 for (tx_num, sender) in senders {
271 cursor.append(tx_num, &sender)?;
272 }
273 Ok(())
274 }
275 Self::StaticFile(writer) => writer.append_transaction_senders(senders),
276 #[cfg(all(unix, feature = "rocksdb"))]
277 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
278 }
279 }
280
281 pub fn prune_senders(
284 &mut self,
285 unwind_tx_from: TxNumber,
286 block: BlockNumber,
287 ) -> ProviderResult<()>
288 where
289 CURSOR: DbCursorRO<tables::TransactionSenders>,
290 {
291 match self {
292 Self::Database(cursor) => {
293 let mut walker = cursor.walk_range(unwind_tx_from..)?;
294 while walker.next().transpose()?.is_some() {
295 walker.delete_current()?;
296 }
297 }
298 Self::StaticFile(writer) => {
299 let static_file_transaction_sender_num = writer
300 .reader()
301 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
302
303 let to_delete = static_file_transaction_sender_num
304 .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
305 .unwrap_or_default();
306
307 writer.prune_transaction_senders(to_delete, block)?;
308 }
309 #[cfg(all(unix, feature = "rocksdb"))]
310 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
311 }
312
313 Ok(())
314 }
315}
316
317impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
318where
319 CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
320{
321 pub fn put_transaction_hash_number(
327 &mut self,
328 hash: TxHash,
329 tx_num: TxNumber,
330 append_only: bool,
331 ) -> ProviderResult<()> {
332 match self {
333 Self::Database(cursor) => {
334 if append_only {
335 Ok(cursor.append(hash, &tx_num)?)
336 } else {
337 Ok(cursor.insert(hash, &tx_num)?)
338 }
339 }
340 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
341 #[cfg(all(unix, feature = "rocksdb"))]
342 Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
343 }
344 }
345
346 pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
348 match self {
349 Self::Database(cursor) => {
350 if cursor.seek_exact(hash)?.is_some() {
351 cursor.delete_current()?;
352 }
353 Ok(())
354 }
355 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
356 #[cfg(all(unix, feature = "rocksdb"))]
357 Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
358 }
359 }
360}
361
362impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
363where
364 CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
365{
366 pub fn put_storage_history(
368 &mut self,
369 key: StorageShardedKey,
370 value: &BlockNumberList,
371 ) -> ProviderResult<()> {
372 match self {
373 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
374 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
375 #[cfg(all(unix, feature = "rocksdb"))]
376 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
377 }
378 }
379
380 pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
382 match self {
383 Self::Database(cursor) => {
384 if cursor.seek_exact(key)?.is_some() {
385 cursor.delete_current()?;
386 }
387 Ok(())
388 }
389 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
390 #[cfg(all(unix, feature = "rocksdb"))]
391 Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
392 }
393 }
394}
395
396impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
397where
398 CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
399{
400 pub fn put_account_history(
402 &mut self,
403 key: ShardedKey<Address>,
404 value: &BlockNumberList,
405 ) -> ProviderResult<()> {
406 match self {
407 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
408 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
409 #[cfg(all(unix, feature = "rocksdb"))]
410 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
411 }
412 }
413
414 pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
416 match self {
417 Self::Database(cursor) => {
418 if cursor.seek_exact(key)?.is_some() {
419 cursor.delete_current()?;
420 }
421 Ok(())
422 }
423 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
424 #[cfg(all(unix, feature = "rocksdb"))]
425 Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
426 }
427 }
428}
429
430#[derive(Debug, Display)]
432pub enum EitherReader<'a, CURSOR, N> {
433 Database(CURSOR, PhantomData<&'a ()>),
435 StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
437 #[cfg(all(unix, feature = "rocksdb"))]
439 RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
440}
441
442impl<'a> EitherReader<'a, (), ()> {
443 pub fn new_senders<P>(
445 provider: &P,
446 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
447 where
448 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
449 P::Tx: DbTx,
450 {
451 if EitherWriterDestination::senders(provider).is_static_file() {
452 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
453 } else {
454 Ok(EitherReader::Database(
455 provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
456 PhantomData,
457 ))
458 }
459 }
460
461 pub fn new_storages_history<P>(
463 provider: &P,
464 _rocksdb_tx: RocksTxRefArg<'a>,
465 ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
466 where
467 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
468 P::Tx: DbTx,
469 {
470 #[cfg(all(unix, feature = "rocksdb"))]
471 if provider.cached_storage_settings().storages_history_in_rocksdb {
472 return Ok(EitherReader::RocksDB(_rocksdb_tx));
473 }
474
475 Ok(EitherReader::Database(
476 provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
477 PhantomData,
478 ))
479 }
480
481 pub fn new_transaction_hash_numbers<P>(
483 provider: &P,
484 _rocksdb_tx: RocksTxRefArg<'a>,
485 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
486 where
487 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
488 P::Tx: DbTx,
489 {
490 #[cfg(all(unix, feature = "rocksdb"))]
491 if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
492 return Ok(EitherReader::RocksDB(_rocksdb_tx));
493 }
494
495 Ok(EitherReader::Database(
496 provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
497 PhantomData,
498 ))
499 }
500
501 pub fn new_accounts_history<P>(
503 provider: &P,
504 _rocksdb_tx: RocksTxRefArg<'a>,
505 ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
506 where
507 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
508 P::Tx: DbTx,
509 {
510 #[cfg(all(unix, feature = "rocksdb"))]
511 if provider.cached_storage_settings().account_history_in_rocksdb {
512 return Ok(EitherReader::RocksDB(_rocksdb_tx));
513 }
514
515 Ok(EitherReader::Database(
516 provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
517 PhantomData,
518 ))
519 }
520}
521
522impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
523where
524 CURSOR: DbCursorRO<tables::TransactionSenders>,
525{
526 pub fn senders_by_tx_range(
528 &mut self,
529 range: Range<TxNumber>,
530 ) -> ProviderResult<HashMap<TxNumber, Address>> {
531 match self {
532 Self::Database(cursor, _) => cursor
533 .walk_range(range)?
534 .map(|result| result.map_err(ProviderError::from))
535 .collect::<ProviderResult<HashMap<_, _>>>(),
536 Self::StaticFile(provider, _) => range
537 .clone()
538 .zip(provider.fetch_range_iter(
539 StaticFileSegment::TransactionSenders,
540 range,
541 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
542 )?)
543 .filter_map(|(tx_num, sender)| {
544 let result = sender.transpose()?;
545 Some(result.map(|sender| (tx_num, sender)))
546 })
547 .collect::<ProviderResult<HashMap<_, _>>>(),
548 #[cfg(all(unix, feature = "rocksdb"))]
549 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
550 }
551 }
552}
553
554impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
555where
556 CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
557{
558 pub fn get_transaction_hash_number(
560 &mut self,
561 hash: TxHash,
562 ) -> ProviderResult<Option<TxNumber>> {
563 match self {
564 Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
565 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
566 #[cfg(all(unix, feature = "rocksdb"))]
567 Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
568 }
569 }
570}
571
572impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
573where
574 CURSOR: DbCursorRO<tables::StoragesHistory>,
575{
576 pub fn get_storage_history(
578 &mut self,
579 key: StorageShardedKey,
580 ) -> ProviderResult<Option<BlockNumberList>> {
581 match self {
582 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
583 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
584 #[cfg(all(unix, feature = "rocksdb"))]
585 Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
586 }
587 }
588}
589
590impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
591where
592 CURSOR: DbCursorRO<tables::AccountsHistory>,
593{
594 pub fn get_account_history(
596 &mut self,
597 key: ShardedKey<Address>,
598 ) -> ProviderResult<Option<BlockNumberList>> {
599 match self {
600 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
601 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
602 #[cfg(all(unix, feature = "rocksdb"))]
603 Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
604 }
605 }
606}
607
608#[derive(Debug, EnumIs)]
610pub enum EitherWriterDestination {
611 Database,
613 StaticFile,
615 RocksDB,
617}
618
619impl EitherWriterDestination {
620 pub fn senders<P>(provider: &P) -> Self
622 where
623 P: StorageSettingsCache,
624 {
625 if provider.cached_storage_settings().transaction_senders_in_static_files {
627 Self::StaticFile
628 } else {
629 Self::Database
630 }
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use crate::test_utils::create_test_provider_factory;
637
638 use super::*;
639 use alloy_primitives::Address;
640 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
641
642 #[test]
643 fn test_reader_senders_by_tx_range() {
644 let factory = create_test_provider_factory();
645
646 let senders = [
648 (1, Address::random()),
649 (2, Address::random()),
650 (3, Address::random()),
651 (4, Address::random()),
652 ];
653
654 for transaction_senders_in_static_files in [false, true] {
655 factory.set_storage_settings_cache(
656 StorageSettings::legacy()
657 .with_transaction_senders_in_static_files(transaction_senders_in_static_files),
658 );
659
660 let provider = factory.database_provider_rw().unwrap();
661 let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
662 if transaction_senders_in_static_files {
663 assert!(matches!(writer, EitherWriter::StaticFile(_)));
664 } else {
665 assert!(matches!(writer, EitherWriter::Database(_)));
666 }
667
668 writer.increment_block(0).unwrap();
669 writer.append_senders(senders.iter().copied()).unwrap();
670 drop(writer);
671 provider.commit().unwrap();
672
673 let provider = factory.database_provider_ro().unwrap();
674 let mut reader = EitherReader::new_senders(&provider).unwrap();
675 if transaction_senders_in_static_files {
676 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
677 } else {
678 assert!(matches!(reader, EitherReader::Database(_, _)));
679 }
680
681 assert_eq!(
682 reader.senders_by_tx_range(0..6).unwrap(),
683 senders.iter().copied().collect::<HashMap<_, _>>(),
684 "{reader}"
685 );
686 }
687 }
688}
689
690#[cfg(all(test, unix, feature = "rocksdb"))]
691mod rocksdb_tests {
692 use super::*;
693 use crate::{
694 providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
695 test_utils::create_test_provider_factory,
696 RocksDBProviderFactory,
697 };
698 use alloy_primitives::{Address, B256};
699 use reth_db_api::{
700 models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
701 tables,
702 };
703 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
704 use tempfile::TempDir;
705
706 fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
707 let temp_dir = TempDir::new().unwrap();
708 let provider = RocksDBBuilder::new(temp_dir.path())
709 .with_table::<tables::TransactionHashNumbers>()
710 .with_table::<tables::StoragesHistory>()
711 .with_table::<tables::AccountsHistory>()
712 .build()
713 .unwrap();
714 (temp_dir, provider)
715 }
716
717 #[test]
721 fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
722 let factory = create_test_provider_factory();
723
724 factory.set_storage_settings_cache(
726 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
727 );
728
729 let hash1 = B256::from([1u8; 32]);
730 let hash2 = B256::from([2u8; 32]);
731 let tx_num1 = 100u64;
732 let tx_num2 = 200u64;
733
734 let rocksdb = factory.rocksdb_provider();
736 let batch = rocksdb.batch();
737
738 let provider = factory.database_provider_rw().unwrap();
740 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
741
742 assert!(matches!(writer, EitherWriter::RocksDB(_)));
744
745 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
747 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
748
749 if let Some(batch) = writer.into_raw_rocksdb_batch() {
751 provider.set_pending_rocksdb_batch(batch);
752 }
753
754 provider.commit().unwrap();
756
757 let rocksdb = factory.rocksdb_provider();
759 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
760 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
761 }
762
763 #[test]
765 fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
766 let factory = create_test_provider_factory();
767
768 factory.set_storage_settings_cache(
770 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
771 );
772
773 let hash = B256::from([1u8; 32]);
774 let tx_num = 100u64;
775
776 let rocksdb = factory.rocksdb_provider();
778 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
779 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
780
781 let batch = rocksdb.batch();
783 let provider = factory.database_provider_rw().unwrap();
784 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
785 writer.delete_transaction_hash_number(hash).unwrap();
786
787 if let Some(batch) = writer.into_raw_rocksdb_batch() {
789 provider.set_pending_rocksdb_batch(batch);
790 }
791 provider.commit().unwrap();
792
793 let rocksdb = factory.rocksdb_provider();
795 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
796 }
797
798 #[test]
799 fn test_rocksdb_batch_transaction_hash_numbers() {
800 let (_temp_dir, provider) = create_rocksdb_provider();
801
802 let hash1 = B256::from([1u8; 32]);
803 let hash2 = B256::from([2u8; 32]);
804 let tx_num1 = 100u64;
805 let tx_num2 = 200u64;
806
807 let mut batch = provider.batch();
809 batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
810 batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
811 batch.commit().unwrap();
812
813 let tx = provider.tx();
815 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
816 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
817
818 let missing_hash = B256::from([99u8; 32]);
820 assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
821 }
822
823 #[test]
824 fn test_rocksdb_batch_storage_history() {
825 let (_temp_dir, provider) = create_rocksdb_provider();
826
827 let address = Address::random();
828 let storage_key = B256::from([1u8; 32]);
829 let key = StorageShardedKey::new(address, storage_key, 1000);
830 let value = IntegerList::new([1, 5, 10, 50]).unwrap();
831
832 let mut batch = provider.batch();
834 batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
835 batch.commit().unwrap();
836
837 let tx = provider.tx();
839 let result = tx.get::<tables::StoragesHistory>(key).unwrap();
840 assert_eq!(result, Some(value));
841
842 let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
844 assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
845 }
846
847 #[test]
848 fn test_rocksdb_batch_account_history() {
849 let (_temp_dir, provider) = create_rocksdb_provider();
850
851 let address = Address::random();
852 let key = ShardedKey::new(address, 1000);
853 let value = IntegerList::new([1, 10, 100, 500]).unwrap();
854
855 let mut batch = provider.batch();
857 batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
858 batch.commit().unwrap();
859
860 let tx = provider.tx();
862 let result = tx.get::<tables::AccountsHistory>(key).unwrap();
863 assert_eq!(result, Some(value));
864
865 let missing_key = ShardedKey::new(Address::random(), 0);
867 assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
868 }
869
870 #[test]
871 fn test_rocksdb_batch_delete_transaction_hash_number() {
872 let (_temp_dir, provider) = create_rocksdb_provider();
873
874 let hash = B256::from([1u8; 32]);
875 let tx_num = 100u64;
876
877 provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
879 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
880
881 let mut batch = provider.batch();
883 batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
884 batch.commit().unwrap();
885
886 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
888 }
889
890 #[test]
891 fn test_rocksdb_batch_delete_storage_history() {
892 let (_temp_dir, provider) = create_rocksdb_provider();
893
894 let address = Address::random();
895 let storage_key = B256::from([1u8; 32]);
896 let key = StorageShardedKey::new(address, storage_key, 1000);
897 let value = IntegerList::new([1, 5, 10]).unwrap();
898
899 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
901 assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
902
903 let mut batch = provider.batch();
905 batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
906 batch.commit().unwrap();
907
908 assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
910 }
911
912 #[test]
913 fn test_rocksdb_batch_delete_account_history() {
914 let (_temp_dir, provider) = create_rocksdb_provider();
915
916 let address = Address::random();
917 let key = ShardedKey::new(address, 1000);
918 let value = IntegerList::new([1, 10, 100]).unwrap();
919
920 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
922 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
923
924 let mut batch = provider.batch();
926 batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
927 batch.commit().unwrap();
928
929 assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
931 }
932
933 #[test]
938 fn test_rocksdb_commits_at_provider_level() {
939 let factory = create_test_provider_factory();
940
941 factory.set_storage_settings_cache(
943 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
944 );
945
946 let hash1 = B256::from([1u8; 32]);
947 let hash2 = B256::from([2u8; 32]);
948 let tx_num1 = 100u64;
949 let tx_num2 = 200u64;
950
951 let rocksdb = factory.rocksdb_provider();
953 let batch = rocksdb.batch();
954
955 let provider = factory.database_provider_rw().unwrap();
957 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
958
959 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
961 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
962
963 let raw_batch = writer.into_raw_rocksdb_batch();
965 if let Some(batch) = raw_batch {
966 provider.set_pending_rocksdb_batch(batch);
967 }
968
969 let rocksdb = factory.rocksdb_provider();
971 assert_eq!(
972 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
973 None,
974 "Data should not be visible before provider.commit()"
975 );
976
977 provider.commit().unwrap();
979
980 let rocksdb = factory.rocksdb_provider();
982 assert_eq!(
983 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
984 Some(tx_num1),
985 "Data should be visible after provider.commit()"
986 );
987 assert_eq!(
988 rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
989 Some(tx_num2),
990 "Data should be visible after provider.commit()"
991 );
992 }
993}