1use std::{
5 collections::BTreeSet,
6 marker::PhantomData,
7 ops::{Range, RangeInclusive},
8};
9
10#[cfg(all(unix, feature = "rocksdb"))]
11use crate::providers::rocksdb::RocksDBBatch;
12use crate::{
13 providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
14 StaticFileProviderFactory,
15};
16use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
17use rayon::slice::ParallelSliceMut;
18use reth_db::{
19 cursor::{DbCursorRO, DbDupCursorRW},
20 models::{AccountBeforeTx, StorageBeforeTx},
21 static_file::TransactionSenderMask,
22 table::Value,
23 transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut, DupCursorMutTy, DupCursorTy},
24};
25use reth_db_api::{
26 cursor::DbCursorRW,
27 models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
28 tables,
29 tables::BlockNumberList,
30};
31use reth_errors::ProviderError;
32use reth_node_types::NodePrimitives;
33use reth_primitives_traits::{ReceiptTy, StorageEntry};
34use reth_static_file_types::StaticFileSegment;
35use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, StorageSettingsCache};
36use reth_storage_errors::provider::ProviderResult;
37use strum::{Display, EnumIs};
38
39type EitherReaderTy<'a, P, T> =
41 EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
42
43type DupEitherReaderTy<'a, P, T> = EitherReader<
45 'a,
46 DupCursorTy<<P as DBProvider>::Tx, T>,
47 <P as NodePrimitivesProvider>::Primitives,
48>;
49
50type DupEitherWriterTy<'a, P, T> = EitherWriter<
52 'a,
53 DupCursorMutTy<<P as DBProvider>::Tx, T>,
54 <P as NodePrimitivesProvider>::Primitives,
55>;
56
57type EitherWriterTy<'a, P, T> = EitherWriter<
59 'a,
60 CursorMutTy<<P as DBProvider>::Tx, T>,
61 <P as NodePrimitivesProvider>::Primitives,
62>;
63
64#[cfg(all(unix, feature = "rocksdb"))]
69pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
70#[cfg(not(all(unix, feature = "rocksdb")))]
75pub type RocksBatchArg<'a> = ();
76
77#[cfg(all(unix, feature = "rocksdb"))]
79pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
80#[cfg(not(all(unix, feature = "rocksdb")))]
82pub type RawRocksDBBatch = ();
83
84#[cfg(all(unix, feature = "rocksdb"))]
92pub type RocksTxRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksTx<'a>>;
93#[cfg(not(all(unix, feature = "rocksdb")))]
98pub type RocksTxRefArg<'a> = ();
99
100#[derive(Debug, Display)]
102pub enum EitherWriter<'a, CURSOR, N> {
103 Database(CURSOR),
105 StaticFile(StaticFileProviderRWRefMut<'a, N>),
107 #[cfg(all(unix, feature = "rocksdb"))]
109 RocksDB(RocksDBBatch<'a>),
110}
111
112impl<'a> EitherWriter<'a, (), ()> {
113 pub fn new_receipts<P>(
115 provider: &'a P,
116 block_number: BlockNumber,
117 ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
118 where
119 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
120 P::Tx: DbTxMut,
121 ReceiptTy<P::Primitives>: Value,
122 {
123 if Self::receipts_destination(provider).is_static_file() {
124 Ok(EitherWriter::StaticFile(
125 provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
126 ))
127 } else {
128 Ok(EitherWriter::Database(
129 provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
130 ))
131 }
132 }
133
134 pub fn new_senders<P>(
136 provider: &'a P,
137 block_number: BlockNumber,
138 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
139 where
140 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
141 P::Tx: DbTxMut,
142 {
143 if EitherWriterDestination::senders(provider).is_static_file() {
144 Ok(EitherWriter::StaticFile(
145 provider
146 .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
147 ))
148 } else {
149 Ok(EitherWriter::Database(
150 provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
151 ))
152 }
153 }
154
155 pub fn new_account_changesets<P>(
158 provider: &'a P,
159 block_number: BlockNumber,
160 ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::AccountChangeSets>>
161 where
162 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
163 P::Tx: DbTxMut,
164 {
165 if provider.cached_storage_settings().storage_v2 {
166 Ok(EitherWriter::StaticFile(
167 provider
168 .get_static_file_writer(block_number, StaticFileSegment::AccountChangeSets)?,
169 ))
170 } else {
171 Ok(EitherWriter::Database(
172 provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?,
173 ))
174 }
175 }
176
177 pub fn new_storage_changesets<P>(
179 provider: &'a P,
180 block_number: BlockNumber,
181 ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::StorageChangeSets>>
182 where
183 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
184 P::Tx: DbTxMut,
185 {
186 if provider.cached_storage_settings().storage_v2 {
187 Ok(EitherWriter::StaticFile(
188 provider
189 .get_static_file_writer(block_number, StaticFileSegment::StorageChangeSets)?,
190 ))
191 } else {
192 Ok(EitherWriter::Database(
193 provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?,
194 ))
195 }
196 }
197
198 pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
207 provider: &P,
208 ) -> EitherWriterDestination {
209 let receipts_in_static_files = provider.cached_storage_settings().storage_v2;
210 let prune_modes = provider.prune_modes_ref();
211
212 if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
213 receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
215 {
216 EitherWriterDestination::Database
217 } else {
218 EitherWriterDestination::StaticFile
219 }
220 }
221
222 pub fn account_changesets_destination<P: DBProvider + StorageSettingsCache>(
226 provider: &P,
227 ) -> EitherWriterDestination {
228 if provider.cached_storage_settings().storage_v2 {
229 EitherWriterDestination::StaticFile
230 } else {
231 EitherWriterDestination::Database
232 }
233 }
234
235 pub fn storage_changesets_destination<P: DBProvider + StorageSettingsCache>(
239 provider: &P,
240 ) -> EitherWriterDestination {
241 if provider.cached_storage_settings().storage_v2 {
242 EitherWriterDestination::StaticFile
243 } else {
244 EitherWriterDestination::Database
245 }
246 }
247
248 pub fn new_storages_history<P>(
250 provider: &P,
251 _rocksdb_batch: RocksBatchArg<'a>,
252 ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
253 where
254 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
255 P::Tx: DbTxMut,
256 {
257 #[cfg(all(unix, feature = "rocksdb"))]
258 if provider.cached_storage_settings().storage_v2 {
259 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
260 }
261
262 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
263 }
264
265 pub fn new_transaction_hash_numbers<P>(
267 provider: &P,
268 _rocksdb_batch: RocksBatchArg<'a>,
269 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
270 where
271 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
272 P::Tx: DbTxMut,
273 {
274 #[cfg(all(unix, feature = "rocksdb"))]
275 if provider.cached_storage_settings().storage_v2 {
276 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
277 }
278
279 Ok(EitherWriter::Database(
280 provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
281 ))
282 }
283
284 pub fn new_accounts_history<P>(
286 provider: &P,
287 _rocksdb_batch: RocksBatchArg<'a>,
288 ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
289 where
290 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
291 P::Tx: DbTxMut,
292 {
293 #[cfg(all(unix, feature = "rocksdb"))]
294 if provider.cached_storage_settings().storage_v2 {
295 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
296 }
297
298 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
299 }
300}
301
302impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
303 #[cfg(all(unix, feature = "rocksdb"))]
311 pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
312 match self {
313 Self::Database(_) | Self::StaticFile(_) => None,
314 Self::RocksDB(batch) => Some(batch.into_inner()),
315 }
316 }
317
318 #[cfg(not(all(unix, feature = "rocksdb")))]
322 pub fn into_raw_rocksdb_batch(self) -> Option<RawRocksDBBatch> {
323 match self {
324 Self::Database(_) | Self::StaticFile(_) => None,
325 }
326 }
327
328 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
332 match self {
333 Self::Database(_) => Ok(()),
334 Self::StaticFile(writer) => writer.increment_block(expected_block_number),
335 #[cfg(all(unix, feature = "rocksdb"))]
336 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
337 }
338 }
339
340 pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
347 match self {
348 Self::Database(_) => Ok(()),
349 Self::StaticFile(writer) => writer.ensure_at_block(block_number),
350 #[cfg(all(unix, feature = "rocksdb"))]
351 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
352 }
353 }
354}
355
356impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
357where
358 N::Receipt: Value,
359 CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
360{
361 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
363 match self {
364 Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
365 Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
366 #[cfg(all(unix, feature = "rocksdb"))]
367 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
368 }
369 }
370}
371
372impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
373where
374 CURSOR: DbCursorRW<tables::TransactionSenders>,
375{
376 pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
378 match self {
379 Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
380 Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
381 #[cfg(all(unix, feature = "rocksdb"))]
382 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
383 }
384 }
385
386 pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
388 where
389 I: Iterator<Item = (TxNumber, Address)>,
390 {
391 match self {
392 Self::Database(cursor) => {
393 for (tx_num, sender) in senders {
394 cursor.append(tx_num, &sender)?;
395 }
396 Ok(())
397 }
398 Self::StaticFile(writer) => writer.append_transaction_senders(senders),
399 #[cfg(all(unix, feature = "rocksdb"))]
400 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
401 }
402 }
403
404 pub fn prune_senders(
407 &mut self,
408 unwind_tx_from: TxNumber,
409 block: BlockNumber,
410 ) -> ProviderResult<()>
411 where
412 CURSOR: DbCursorRO<tables::TransactionSenders>,
413 {
414 match self {
415 Self::Database(cursor) => {
416 let mut walker = cursor.walk_range(unwind_tx_from..)?;
417 while walker.next().transpose()?.is_some() {
418 walker.delete_current()?;
419 }
420 }
421 Self::StaticFile(writer) => {
422 let static_file_transaction_sender_num = writer
423 .reader()
424 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
425
426 let to_delete = static_file_transaction_sender_num
427 .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
428 .unwrap_or_default();
429
430 writer.prune_transaction_senders(to_delete, block)?;
431 }
432 #[cfg(all(unix, feature = "rocksdb"))]
433 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
434 }
435
436 Ok(())
437 }
438}
439
440impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
441where
442 CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
443{
444 pub fn put_transaction_hash_number(
450 &mut self,
451 hash: TxHash,
452 tx_num: TxNumber,
453 append_only: bool,
454 ) -> ProviderResult<()> {
455 match self {
456 Self::Database(cursor) => {
457 if append_only {
458 Ok(cursor.append(hash, &tx_num)?)
459 } else {
460 Ok(cursor.upsert(hash, &tx_num)?)
461 }
462 }
463 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
464 #[cfg(all(unix, feature = "rocksdb"))]
465 Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
466 }
467 }
468
469 pub fn put_transaction_hash_numbers_batch(
478 &mut self,
479 entries: Vec<(TxHash, TxNumber)>,
480 append_only: bool,
481 ) -> ProviderResult<()> {
482 match self {
483 Self::Database(cursor) => {
484 for (hash, tx_num) in entries {
485 if append_only {
486 cursor.append(hash, &tx_num)?;
487 } else {
488 cursor.upsert(hash, &tx_num)?;
489 }
490 }
491 Ok(())
492 }
493 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
494 #[cfg(all(unix, feature = "rocksdb"))]
495 Self::RocksDB(batch) => {
496 for (hash, tx_num) in entries {
497 batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
498 }
499 Ok(())
500 }
501 }
502 }
503
504 pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
506 match self {
507 Self::Database(cursor) => {
508 if cursor.seek_exact(hash)?.is_some() {
509 cursor.delete_current()?;
510 }
511 Ok(())
512 }
513 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
514 #[cfg(all(unix, feature = "rocksdb"))]
515 Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
516 }
517 }
518}
519
520impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
521where
522 CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
523{
524 pub fn put_storage_history(
526 &mut self,
527 key: StorageShardedKey,
528 value: &BlockNumberList,
529 ) -> ProviderResult<()> {
530 match self {
531 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
532 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
533 #[cfg(all(unix, feature = "rocksdb"))]
534 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
535 }
536 }
537
538 pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
540 match self {
541 Self::Database(cursor) => {
542 if cursor.seek_exact(key)?.is_some() {
543 cursor.delete_current()?;
544 }
545 Ok(())
546 }
547 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
548 #[cfg(all(unix, feature = "rocksdb"))]
549 Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
550 }
551 }
552
553 pub fn append_storage_history(
555 &mut self,
556 key: StorageShardedKey,
557 value: &BlockNumberList,
558 ) -> ProviderResult<()> {
559 match self {
560 Self::Database(cursor) => Ok(cursor.append(key, value)?),
561 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
562 #[cfg(all(unix, feature = "rocksdb"))]
563 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
564 }
565 }
566
567 pub fn upsert_storage_history(
569 &mut self,
570 key: StorageShardedKey,
571 value: &BlockNumberList,
572 ) -> ProviderResult<()> {
573 match self {
574 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
575 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
576 #[cfg(all(unix, feature = "rocksdb"))]
577 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
578 }
579 }
580
581 pub fn get_last_storage_history_shard(
583 &mut self,
584 address: Address,
585 storage_key: B256,
586 ) -> ProviderResult<Option<BlockNumberList>> {
587 let key = StorageShardedKey::last(address, storage_key);
588 match self {
589 Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
590 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
591 #[cfg(all(unix, feature = "rocksdb"))]
592 Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
593 }
594 }
595}
596
597impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
598where
599 CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
600{
601 pub fn append_account_history(
603 &mut self,
604 key: ShardedKey<Address>,
605 value: &BlockNumberList,
606 ) -> ProviderResult<()> {
607 match self {
608 Self::Database(cursor) => Ok(cursor.append(key, value)?),
609 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
610 #[cfg(all(unix, feature = "rocksdb"))]
611 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
612 }
613 }
614
615 pub fn upsert_account_history(
617 &mut self,
618 key: ShardedKey<Address>,
619 value: &BlockNumberList,
620 ) -> ProviderResult<()> {
621 match self {
622 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
623 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
624 #[cfg(all(unix, feature = "rocksdb"))]
625 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
626 }
627 }
628
629 pub fn get_last_account_history_shard(
631 &mut self,
632 address: Address,
633 ) -> ProviderResult<Option<BlockNumberList>> {
634 match self {
635 Self::Database(cursor) => {
636 Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
637 }
638 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
639 #[cfg(all(unix, feature = "rocksdb"))]
640 Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
641 }
642 }
643
644 pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
646 match self {
647 Self::Database(cursor) => {
648 if cursor.seek_exact(key)?.is_some() {
649 cursor.delete_current()?;
650 }
651 Ok(())
652 }
653 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
654 #[cfg(all(unix, feature = "rocksdb"))]
655 Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
656 }
657 }
658}
659
660impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
661where
662 CURSOR: DbDupCursorRW<tables::AccountChangeSets>,
663{
664 pub fn append_account_changeset(
668 &mut self,
669 block_number: BlockNumber,
670 mut changeset: Vec<AccountBeforeTx>,
671 ) -> ProviderResult<()> {
672 changeset.par_sort_by_key(|a| a.address);
674 match self {
675 Self::Database(cursor) => {
676 for change in changeset {
677 cursor.append_dup(block_number, change)?;
678 }
679 }
680 Self::StaticFile(writer) => {
681 writer.append_account_changeset(changeset, block_number)?;
682 }
683 #[cfg(all(unix, feature = "rocksdb"))]
684 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
685 }
686
687 Ok(())
688 }
689}
690
691impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
692where
693 CURSOR: DbDupCursorRW<tables::StorageChangeSets>,
694{
695 pub fn append_storage_changeset(
699 &mut self,
700 block_number: BlockNumber,
701 mut changeset: Vec<StorageBeforeTx>,
702 ) -> ProviderResult<()> {
703 changeset.par_sort_by_key(|change| (change.address, change.key));
704
705 match self {
706 Self::Database(cursor) => {
707 for change in changeset {
708 let storage_id = BlockNumberAddress((block_number, change.address));
709 cursor.append_dup(
710 storage_id,
711 StorageEntry { key: change.key, value: change.value },
712 )?;
713 }
714 }
715 Self::StaticFile(writer) => {
716 writer.append_storage_changeset(changeset, block_number)?;
717 }
718 #[cfg(all(unix, feature = "rocksdb"))]
719 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
720 }
721
722 Ok(())
723 }
724}
725
726#[derive(Debug, Display)]
728pub enum EitherReader<'a, CURSOR, N> {
729 Database(CURSOR, PhantomData<&'a ()>),
731 StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
733 #[cfg(all(unix, feature = "rocksdb"))]
735 RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
736}
737
738impl<'a> EitherReader<'a, (), ()> {
739 pub fn new_senders<P>(
741 provider: &P,
742 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
743 where
744 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
745 P::Tx: DbTx,
746 {
747 if EitherWriterDestination::senders(provider).is_static_file() {
748 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
749 } else {
750 Ok(EitherReader::Database(
751 provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
752 PhantomData,
753 ))
754 }
755 }
756
757 pub fn new_storages_history<P>(
759 provider: &P,
760 _rocksdb_tx: RocksTxRefArg<'a>,
761 ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
762 where
763 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
764 P::Tx: DbTx,
765 {
766 #[cfg(all(unix, feature = "rocksdb"))]
767 if provider.cached_storage_settings().storage_v2 {
768 return Ok(EitherReader::RocksDB(
769 _rocksdb_tx.expect("storages_history_in_rocksdb requires rocksdb tx"),
770 ));
771 }
772
773 Ok(EitherReader::Database(
774 provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
775 PhantomData,
776 ))
777 }
778
779 pub fn new_transaction_hash_numbers<P>(
781 provider: &P,
782 _rocksdb_tx: RocksTxRefArg<'a>,
783 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
784 where
785 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
786 P::Tx: DbTx,
787 {
788 #[cfg(all(unix, feature = "rocksdb"))]
789 if provider.cached_storage_settings().storage_v2 {
790 return Ok(EitherReader::RocksDB(
791 _rocksdb_tx.expect("transaction_hash_numbers_in_rocksdb requires rocksdb tx"),
792 ));
793 }
794
795 Ok(EitherReader::Database(
796 provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
797 PhantomData,
798 ))
799 }
800
801 pub fn new_accounts_history<P>(
803 provider: &P,
804 _rocksdb_tx: RocksTxRefArg<'a>,
805 ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
806 where
807 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
808 P::Tx: DbTx,
809 {
810 #[cfg(all(unix, feature = "rocksdb"))]
811 if provider.cached_storage_settings().storage_v2 {
812 return Ok(EitherReader::RocksDB(
813 _rocksdb_tx.expect("account_history_in_rocksdb requires rocksdb tx"),
814 ));
815 }
816
817 Ok(EitherReader::Database(
818 provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
819 PhantomData,
820 ))
821 }
822
823 pub fn new_account_changesets<P>(
825 provider: &P,
826 ) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
827 where
828 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
829 P::Tx: DbTx,
830 {
831 if EitherWriterDestination::account_changesets(provider).is_static_file() {
832 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
833 } else {
834 Ok(EitherReader::Database(
835 provider.tx_ref().cursor_dup_read::<tables::AccountChangeSets>()?,
836 PhantomData,
837 ))
838 }
839 }
840}
841
842impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
843where
844 CURSOR: DbCursorRO<tables::TransactionSenders>,
845{
846 pub fn senders_by_tx_range(
848 &mut self,
849 range: Range<TxNumber>,
850 ) -> ProviderResult<HashMap<TxNumber, Address>> {
851 match self {
852 Self::Database(cursor, _) => cursor
853 .walk_range(range)?
854 .map(|result| result.map_err(ProviderError::from))
855 .collect::<ProviderResult<HashMap<_, _>>>(),
856 Self::StaticFile(provider, _) => range
857 .clone()
858 .zip(provider.fetch_range_iter(
859 StaticFileSegment::TransactionSenders,
860 range,
861 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
862 )?)
863 .filter_map(|(tx_num, sender)| {
864 let result = sender.transpose()?;
865 Some(result.map(|sender| (tx_num, sender)))
866 })
867 .collect::<ProviderResult<HashMap<_, _>>>(),
868 #[cfg(all(unix, feature = "rocksdb"))]
869 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
870 }
871 }
872}
873
874impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
875where
876 CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
877{
878 pub fn get_transaction_hash_number(
880 &mut self,
881 hash: TxHash,
882 ) -> ProviderResult<Option<TxNumber>> {
883 match self {
884 Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
885 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
886 #[cfg(all(unix, feature = "rocksdb"))]
887 Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
888 }
889 }
890}
891
892impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
893where
894 CURSOR: DbCursorRO<tables::StoragesHistory>,
895{
896 pub fn get_storage_history(
898 &mut self,
899 key: StorageShardedKey,
900 ) -> ProviderResult<Option<BlockNumberList>> {
901 match self {
902 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
903 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
904 #[cfg(all(unix, feature = "rocksdb"))]
905 Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
906 }
907 }
908
909 pub fn storage_history_info(
911 &mut self,
912 address: Address,
913 storage_key: alloy_primitives::B256,
914 block_number: BlockNumber,
915 lowest_available_block_number: Option<BlockNumber>,
916 ) -> ProviderResult<HistoryInfo> {
917 match self {
918 Self::Database(cursor, _) => {
919 let key = StorageShardedKey::new(address, storage_key, block_number);
920 history_info::<tables::StoragesHistory, _, _>(
921 cursor,
922 key,
923 block_number,
924 |k| k.address == address && k.sharded_key.key == storage_key,
925 lowest_available_block_number,
926 )
927 }
928 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
929 #[cfg(all(unix, feature = "rocksdb"))]
930 Self::RocksDB(tx) => tx.storage_history_info(
931 address,
932 storage_key,
933 block_number,
934 lowest_available_block_number,
935 ),
936 }
937 }
938}
939
940impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
941where
942 CURSOR: DbCursorRO<tables::AccountsHistory>,
943{
944 pub fn get_account_history(
946 &mut self,
947 key: ShardedKey<Address>,
948 ) -> ProviderResult<Option<BlockNumberList>> {
949 match self {
950 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
951 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
952 #[cfg(all(unix, feature = "rocksdb"))]
953 Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
954 }
955 }
956
957 pub fn account_history_info(
959 &mut self,
960 address: Address,
961 block_number: BlockNumber,
962 lowest_available_block_number: Option<BlockNumber>,
963 ) -> ProviderResult<HistoryInfo> {
964 match self {
965 Self::Database(cursor, _) => {
966 let key = ShardedKey::new(address, block_number);
967 history_info::<tables::AccountsHistory, _, _>(
968 cursor,
969 key,
970 block_number,
971 |k| k.key == address,
972 lowest_available_block_number,
973 )
974 }
975 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
976 #[cfg(all(unix, feature = "rocksdb"))]
977 Self::RocksDB(tx) => {
978 tx.account_history_info(address, block_number, lowest_available_block_number)
979 }
980 }
981 }
982}
983
984impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
985where
986 CURSOR: DbCursorRO<tables::AccountChangeSets>,
987{
988 pub fn changed_accounts_with_range(
990 &mut self,
991 range: RangeInclusive<BlockNumber>,
992 ) -> ProviderResult<BTreeSet<Address>> {
993 match self {
994 Self::StaticFile(provider, _) => {
995 let highest_static_block =
996 provider.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
997
998 let Some(highest) = highest_static_block else {
999 return Err(ProviderError::MissingHighestStaticFileBlock(
1000 StaticFileSegment::AccountChangeSets,
1001 ))
1002 };
1003
1004 let start = *range.start();
1005 let static_end = (*range.end()).min(highest);
1006
1007 let mut changed_accounts = BTreeSet::default();
1008 if start <= static_end {
1009 for block in start..=static_end {
1010 let block_changesets = provider.account_block_changeset(block)?;
1011 for changeset in block_changesets {
1012 changed_accounts.insert(changeset.address);
1013 }
1014 }
1015 }
1016
1017 Ok(changed_accounts)
1018 }
1019 Self::Database(provider, _) => provider
1020 .walk_range(range)?
1021 .map(|entry| {
1022 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
1023 })
1024 .collect(),
1025 #[cfg(all(unix, feature = "rocksdb"))]
1026 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
1027 }
1028 }
1029}
1030
1031#[derive(Debug, EnumIs)]
1033pub enum EitherWriterDestination {
1034 Database,
1036 StaticFile,
1038 RocksDB,
1040}
1041
1042impl EitherWriterDestination {
1043 pub fn senders<P>(provider: &P) -> Self
1045 where
1046 P: StorageSettingsCache,
1047 {
1048 if provider.cached_storage_settings().storage_v2 {
1050 Self::StaticFile
1051 } else {
1052 Self::Database
1053 }
1054 }
1055
1056 pub fn account_changesets<P>(provider: &P) -> Self
1058 where
1059 P: StorageSettingsCache,
1060 {
1061 if provider.cached_storage_settings().storage_v2 {
1063 Self::StaticFile
1064 } else {
1065 Self::Database
1066 }
1067 }
1068
1069 pub fn storage_changesets<P>(provider: &P) -> Self
1071 where
1072 P: StorageSettingsCache,
1073 {
1074 if provider.cached_storage_settings().storage_v2 {
1076 Self::StaticFile
1077 } else {
1078 Self::Database
1079 }
1080 }
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085 use crate::{test_utils::create_test_provider_factory, StaticFileWriter};
1086
1087 use super::*;
1088 use alloy_primitives::Address;
1089 use reth_db::models::AccountBeforeTx;
1090 use reth_static_file_types::StaticFileSegment;
1091 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1092
1093 #[test]
1105 fn test_changed_accounts_with_range_caps_at_static_file_tip() {
1106 let factory = create_test_provider_factory();
1107 let highest_block = 5u64;
1108
1109 let addresses: Vec<Address> = (0..=highest_block)
1110 .map(|i| {
1111 let mut addr = Address::ZERO;
1112 addr.0[0] = i as u8;
1113 addr
1114 })
1115 .collect();
1116
1117 {
1118 let sf_provider = factory.static_file_provider();
1119 let mut writer =
1120 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1121
1122 for block_num in 0..=highest_block {
1123 let changeset =
1124 vec![AccountBeforeTx { address: addresses[block_num as usize], info: None }];
1125 writer.append_account_changeset(changeset, block_num).unwrap();
1126 }
1127 writer.commit().unwrap();
1128 }
1129
1130 factory.set_storage_settings_cache(StorageSettings::v2());
1131
1132 let provider = factory.database_provider_ro().unwrap();
1133
1134 let sf_tip = provider
1135 .static_file_provider()
1136 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1137 assert_eq!(sf_tip, Some(highest_block));
1138
1139 let mut reader = EitherReader::new_account_changesets(&provider).unwrap();
1140 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1141
1142 let result = reader.changed_accounts_with_range(0..=10).unwrap();
1144
1145 let expected: BTreeSet<Address> = addresses.into_iter().collect();
1146 assert_eq!(result, expected);
1147 }
1148
1149 #[test]
1150 fn test_reader_senders_by_tx_range() {
1151 let factory = create_test_provider_factory();
1152
1153 let senders = [
1155 (1, Address::random()),
1156 (2, Address::random()),
1157 (3, Address::random()),
1158 (4, Address::random()),
1159 ];
1160
1161 for transaction_senders_in_static_files in [false, true] {
1162 factory.set_storage_settings_cache(if transaction_senders_in_static_files {
1163 StorageSettings::v2()
1164 } else {
1165 StorageSettings::v1()
1166 });
1167
1168 let provider = factory.database_provider_rw().unwrap();
1169 let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
1170 if transaction_senders_in_static_files {
1171 assert!(matches!(writer, EitherWriter::StaticFile(_)));
1172 } else {
1173 assert!(matches!(writer, EitherWriter::Database(_)));
1174 }
1175
1176 writer.increment_block(0).unwrap();
1177 writer.append_senders(senders.iter().copied()).unwrap();
1178 drop(writer);
1179 provider.commit().unwrap();
1180
1181 let provider = factory.database_provider_ro().unwrap();
1182 let mut reader = EitherReader::new_senders(&provider).unwrap();
1183 if transaction_senders_in_static_files {
1184 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1185 } else {
1186 assert!(matches!(reader, EitherReader::Database(_, _)));
1187 }
1188
1189 assert_eq!(
1190 reader.senders_by_tx_range(0..6).unwrap(),
1191 senders.iter().copied().collect::<HashMap<_, _>>(),
1192 "{reader}"
1193 );
1194 }
1195 }
1196}
1197
1198#[cfg(all(test, unix, feature = "rocksdb"))]
1199mod rocksdb_tests {
1200 use super::*;
1201 use crate::{
1202 providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
1203 test_utils::create_test_provider_factory,
1204 RocksDBProviderFactory,
1205 };
1206 use alloy_primitives::{Address, B256};
1207 use reth_db_api::{
1208 models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1209 tables,
1210 transaction::DbTxMut,
1211 };
1212 use reth_ethereum_primitives::EthPrimitives;
1213 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1214 use std::marker::PhantomData;
1215 use tempfile::TempDir;
1216
1217 fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
1218 let temp_dir = TempDir::new().unwrap();
1219 let provider = RocksDBBuilder::new(temp_dir.path())
1220 .with_table::<tables::TransactionHashNumbers>()
1221 .with_table::<tables::StoragesHistory>()
1222 .with_table::<tables::AccountsHistory>()
1223 .build()
1224 .unwrap();
1225 (temp_dir, provider)
1226 }
1227
1228 #[test]
1232 fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
1233 let factory = create_test_provider_factory();
1234
1235 factory.set_storage_settings_cache(StorageSettings::v2());
1237
1238 let hash1 = B256::from([1u8; 32]);
1239 let hash2 = B256::from([2u8; 32]);
1240 let tx_num1 = 100u64;
1241 let tx_num2 = 200u64;
1242
1243 let rocksdb = factory.rocksdb_provider();
1245 let batch = rocksdb.batch();
1246
1247 let provider = factory.database_provider_rw().unwrap();
1249 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1250
1251 assert!(matches!(writer, EitherWriter::RocksDB(_)));
1253
1254 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1256 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1257
1258 if let Some(batch) = writer.into_raw_rocksdb_batch() {
1260 provider.set_pending_rocksdb_batch(batch);
1261 }
1262
1263 provider.commit().unwrap();
1265
1266 let rocksdb = factory.rocksdb_provider();
1268 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1269 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1270 }
1271
1272 #[test]
1274 fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
1275 let factory = create_test_provider_factory();
1276
1277 factory.set_storage_settings_cache(StorageSettings::v2());
1279
1280 let hash = B256::from([1u8; 32]);
1281 let tx_num = 100u64;
1282
1283 let rocksdb = factory.rocksdb_provider();
1285 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1286 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1287
1288 let batch = rocksdb.batch();
1290 let provider = factory.database_provider_rw().unwrap();
1291 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1292 writer.delete_transaction_hash_number(hash).unwrap();
1293
1294 if let Some(batch) = writer.into_raw_rocksdb_batch() {
1296 provider.set_pending_rocksdb_batch(batch);
1297 }
1298 provider.commit().unwrap();
1299
1300 let rocksdb = factory.rocksdb_provider();
1302 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1303 }
1304
1305 #[test]
1306 fn test_rocksdb_batch_transaction_hash_numbers() {
1307 let (_temp_dir, provider) = create_rocksdb_provider();
1308
1309 let hash1 = B256::from([1u8; 32]);
1310 let hash2 = B256::from([2u8; 32]);
1311 let tx_num1 = 100u64;
1312 let tx_num2 = 200u64;
1313
1314 let mut batch = provider.batch();
1316 batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
1317 batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
1318 batch.commit().unwrap();
1319
1320 let tx = provider.tx();
1322 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1323 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1324
1325 let missing_hash = B256::from([99u8; 32]);
1327 assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
1328 }
1329
1330 #[test]
1331 fn test_rocksdb_batch_storage_history() {
1332 let (_temp_dir, provider) = create_rocksdb_provider();
1333
1334 let address = Address::random();
1335 let storage_key = B256::from([1u8; 32]);
1336 let key = StorageShardedKey::new(address, storage_key, 1000);
1337 let value = IntegerList::new([1, 5, 10, 50]).unwrap();
1338
1339 let mut batch = provider.batch();
1341 batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1342 batch.commit().unwrap();
1343
1344 let tx = provider.tx();
1346 let result = tx.get::<tables::StoragesHistory>(key).unwrap();
1347 assert_eq!(result, Some(value));
1348
1349 let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
1351 assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
1352 }
1353
1354 #[test]
1355 fn test_rocksdb_batch_account_history() {
1356 let (_temp_dir, provider) = create_rocksdb_provider();
1357
1358 let address = Address::random();
1359 let key = ShardedKey::new(address, 1000);
1360 let value = IntegerList::new([1, 10, 100, 500]).unwrap();
1361
1362 let mut batch = provider.batch();
1364 batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1365 batch.commit().unwrap();
1366
1367 let tx = provider.tx();
1369 let result = tx.get::<tables::AccountsHistory>(key).unwrap();
1370 assert_eq!(result, Some(value));
1371
1372 let missing_key = ShardedKey::new(Address::random(), 0);
1374 assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
1375 }
1376
1377 #[test]
1378 fn test_rocksdb_batch_delete_transaction_hash_number() {
1379 let (_temp_dir, provider) = create_rocksdb_provider();
1380
1381 let hash = B256::from([1u8; 32]);
1382 let tx_num = 100u64;
1383
1384 provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1386 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1387
1388 let mut batch = provider.batch();
1390 batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
1391 batch.commit().unwrap();
1392
1393 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1395 }
1396
1397 #[test]
1398 fn test_rocksdb_batch_delete_storage_history() {
1399 let (_temp_dir, provider) = create_rocksdb_provider();
1400
1401 let address = Address::random();
1402 let storage_key = B256::from([1u8; 32]);
1403 let key = StorageShardedKey::new(address, storage_key, 1000);
1404 let value = IntegerList::new([1, 5, 10]).unwrap();
1405
1406 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1408 assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
1409
1410 let mut batch = provider.batch();
1412 batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
1413 batch.commit().unwrap();
1414
1415 assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
1417 }
1418
1419 #[test]
1420 fn test_rocksdb_batch_delete_account_history() {
1421 let (_temp_dir, provider) = create_rocksdb_provider();
1422
1423 let address = Address::random();
1424 let key = ShardedKey::new(address, 1000);
1425 let value = IntegerList::new([1, 10, 100]).unwrap();
1426
1427 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1429 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
1430
1431 let mut batch = provider.batch();
1433 batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
1434 batch.commit().unwrap();
1435
1436 assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
1438 }
1439
1440 struct HistoryQuery {
1447 block_number: BlockNumber,
1448 lowest_available: Option<BlockNumber>,
1449 expected: HistoryInfo,
1450 }
1451
1452 type AccountsHistoryWriteCursor =
1454 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
1455 type StoragesHistoryWriteCursor =
1456 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
1457 type AccountsHistoryReadCursor =
1458 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
1459 type StoragesHistoryReadCursor =
1460 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
1461
1462 fn run_account_history_scenario(
1465 scenario_name: &str,
1466 address: Address,
1467 shards: &[(BlockNumber, Vec<BlockNumber>)], queries: &[HistoryQuery],
1469 ) {
1470 let factory = create_test_provider_factory();
1472 let mdbx_provider = factory.database_provider_rw().unwrap();
1473 let (temp_dir, rocks_provider) = create_rocksdb_provider();
1474
1475 let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1477 EitherWriter::Database(
1478 mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
1479 );
1480 let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1481 EitherWriter::RocksDB(rocks_provider.batch());
1482
1483 for (highest_block, blocks) in shards {
1485 let key = ShardedKey::new(address, *highest_block);
1486 let value = IntegerList::new(blocks.clone()).unwrap();
1487 mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
1488 rocks_writer.upsert_account_history(key, &value).unwrap();
1489 }
1490
1491 drop(mdbx_writer);
1493 mdbx_provider.commit().unwrap();
1494 if let EitherWriter::RocksDB(batch) = rocks_writer {
1495 batch.commit().unwrap();
1496 }
1497
1498 let mdbx_ro = factory.database_provider_ro().unwrap();
1500 let rocks_tx = rocks_provider.tx();
1501
1502 for (i, query) in queries.iter().enumerate() {
1503 let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1505 EitherReader::Database(
1506 mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
1507 PhantomData,
1508 );
1509 let mdbx_result = mdbx_reader
1510 .account_history_info(address, query.block_number, query.lowest_available)
1511 .unwrap();
1512
1513 let mut rocks_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1515 EitherReader::RocksDB(&rocks_tx);
1516 let rocks_result = rocks_reader
1517 .account_history_info(address, query.block_number, query.lowest_available)
1518 .unwrap();
1519
1520 assert_eq!(
1522 mdbx_result,
1523 rocks_result,
1524 "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1525 MDBX: {:?}, RocksDB: {:?}",
1526 scenario_name,
1527 i,
1528 query.block_number,
1529 query.lowest_available,
1530 mdbx_result,
1531 rocks_result
1532 );
1533
1534 assert_eq!(
1536 mdbx_result,
1537 query.expected,
1538 "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1539 Got: {:?}, Expected: {:?}",
1540 scenario_name,
1541 i,
1542 query.block_number,
1543 query.lowest_available,
1544 mdbx_result,
1545 query.expected
1546 );
1547 }
1548
1549 rocks_tx.rollback().unwrap();
1550 drop(temp_dir);
1551 }
1552
1553 fn run_storage_history_scenario(
1556 scenario_name: &str,
1557 address: Address,
1558 storage_key: B256,
1559 shards: &[(BlockNumber, Vec<BlockNumber>)], queries: &[HistoryQuery],
1561 ) {
1562 let factory = create_test_provider_factory();
1564 let mdbx_provider = factory.database_provider_rw().unwrap();
1565 let (temp_dir, rocks_provider) = create_rocksdb_provider();
1566
1567 let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1569 EitherWriter::Database(
1570 mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
1571 );
1572 let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1573 EitherWriter::RocksDB(rocks_provider.batch());
1574
1575 for (highest_block, blocks) in shards {
1577 let key = StorageShardedKey::new(address, storage_key, *highest_block);
1578 let value = IntegerList::new(blocks.clone()).unwrap();
1579 mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
1580 rocks_writer.put_storage_history(key, &value).unwrap();
1581 }
1582
1583 drop(mdbx_writer);
1585 mdbx_provider.commit().unwrap();
1586 if let EitherWriter::RocksDB(batch) = rocks_writer {
1587 batch.commit().unwrap();
1588 }
1589
1590 let mdbx_ro = factory.database_provider_ro().unwrap();
1592 let rocks_tx = rocks_provider.tx();
1593
1594 for (i, query) in queries.iter().enumerate() {
1595 let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1597 EitherReader::Database(
1598 mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
1599 PhantomData,
1600 );
1601 let mdbx_result = mdbx_reader
1602 .storage_history_info(
1603 address,
1604 storage_key,
1605 query.block_number,
1606 query.lowest_available,
1607 )
1608 .unwrap();
1609
1610 let mut rocks_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1612 EitherReader::RocksDB(&rocks_tx);
1613 let rocks_result = rocks_reader
1614 .storage_history_info(
1615 address,
1616 storage_key,
1617 query.block_number,
1618 query.lowest_available,
1619 )
1620 .unwrap();
1621
1622 assert_eq!(
1624 mdbx_result,
1625 rocks_result,
1626 "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1627 MDBX: {:?}, RocksDB: {:?}",
1628 scenario_name,
1629 i,
1630 query.block_number,
1631 query.lowest_available,
1632 mdbx_result,
1633 rocks_result
1634 );
1635
1636 assert_eq!(
1638 mdbx_result,
1639 query.expected,
1640 "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1641 Got: {:?}, Expected: {:?}",
1642 scenario_name,
1643 i,
1644 query.block_number,
1645 query.lowest_available,
1646 mdbx_result,
1647 query.expected
1648 );
1649 }
1650
1651 rocks_tx.rollback().unwrap();
1652 drop(temp_dir);
1653 }
1654
1655 #[test]
1663 fn test_account_history_info_both_backends() {
1664 let address = Address::from([0x42; 20]);
1665
1666 run_account_history_scenario(
1668 "single_shard",
1669 address,
1670 &[(u64::MAX, vec![100, 200, 300])],
1671 &[
1672 HistoryQuery {
1674 block_number: 50,
1675 lowest_available: None,
1676 expected: HistoryInfo::NotYetWritten,
1677 },
1678 HistoryQuery {
1680 block_number: 150,
1681 lowest_available: None,
1682 expected: HistoryInfo::InChangeset(200),
1683 },
1684 HistoryQuery {
1686 block_number: 300,
1687 lowest_available: None,
1688 expected: HistoryInfo::InChangeset(300),
1689 },
1690 HistoryQuery {
1692 block_number: 500,
1693 lowest_available: None,
1694 expected: HistoryInfo::InPlainState,
1695 },
1696 ],
1697 );
1698
1699 run_account_history_scenario(
1701 "multiple_shards",
1702 address,
1703 &[
1704 (500, vec![100, 200, 300, 400, 500]), (u64::MAX, vec![600, 700, 800]), ],
1707 &[
1708 HistoryQuery {
1710 block_number: 50,
1711 lowest_available: None,
1712 expected: HistoryInfo::NotYetWritten,
1713 },
1714 HistoryQuery {
1716 block_number: 150,
1717 lowest_available: None,
1718 expected: HistoryInfo::InChangeset(200),
1719 },
1720 HistoryQuery {
1722 block_number: 550,
1723 lowest_available: None,
1724 expected: HistoryInfo::InChangeset(600),
1725 },
1726 HistoryQuery {
1728 block_number: 900,
1729 lowest_available: None,
1730 expected: HistoryInfo::InPlainState,
1731 },
1732 ],
1733 );
1734
1735 let address_without_history = Address::from([0x43; 20]);
1737 run_account_history_scenario(
1738 "no_history",
1739 address_without_history,
1740 &[], &[HistoryQuery {
1742 block_number: 150,
1743 lowest_available: None,
1744 expected: HistoryInfo::NotYetWritten,
1745 }],
1746 );
1747
1748 run_account_history_scenario(
1754 "with_pruning_boundary",
1755 address,
1756 &[(u64::MAX, vec![100, 200, 300])],
1757 &[
1758 HistoryQuery {
1760 block_number: 100,
1761 lowest_available: Some(100),
1762 expected: HistoryInfo::InChangeset(100),
1763 },
1764 HistoryQuery {
1766 block_number: 150,
1767 lowest_available: Some(100),
1768 expected: HistoryInfo::InChangeset(200),
1769 },
1770 ],
1771 );
1772 }
1773
1774 #[test]
1776 fn test_storage_history_info_both_backends() {
1777 let address = Address::from([0x42; 20]);
1778 let storage_key = B256::from([0x01; 32]);
1779 let other_storage_key = B256::from([0x02; 32]);
1780
1781 run_storage_history_scenario(
1783 "storage_single_shard",
1784 address,
1785 storage_key,
1786 &[(u64::MAX, vec![100, 200, 300])],
1787 &[
1788 HistoryQuery {
1790 block_number: 50,
1791 lowest_available: None,
1792 expected: HistoryInfo::NotYetWritten,
1793 },
1794 HistoryQuery {
1796 block_number: 150,
1797 lowest_available: None,
1798 expected: HistoryInfo::InChangeset(200),
1799 },
1800 HistoryQuery {
1802 block_number: 500,
1803 lowest_available: None,
1804 expected: HistoryInfo::InPlainState,
1805 },
1806 ],
1807 );
1808
1809 run_storage_history_scenario(
1811 "storage_no_history",
1812 address,
1813 other_storage_key,
1814 &[], &[HistoryQuery {
1816 block_number: 150,
1817 lowest_available: None,
1818 expected: HistoryInfo::NotYetWritten,
1819 }],
1820 );
1821 }
1822
1823 #[test]
1826 fn test_rocksdb_commits_at_provider_level() {
1827 let factory = create_test_provider_factory();
1828
1829 factory.set_storage_settings_cache(StorageSettings::v2());
1831
1832 let hash1 = B256::from([1u8; 32]);
1833 let hash2 = B256::from([2u8; 32]);
1834 let tx_num1 = 100u64;
1835 let tx_num2 = 200u64;
1836
1837 let rocksdb = factory.rocksdb_provider();
1839 let batch = rocksdb.batch();
1840
1841 let provider = factory.database_provider_rw().unwrap();
1843 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1844
1845 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1847 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1848
1849 let raw_batch = writer.into_raw_rocksdb_batch();
1851 if let Some(batch) = raw_batch {
1852 provider.set_pending_rocksdb_batch(batch);
1853 }
1854
1855 let rocksdb = factory.rocksdb_provider();
1857 assert_eq!(
1858 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1859 None,
1860 "Data should not be visible before provider.commit()"
1861 );
1862
1863 provider.commit().unwrap();
1865
1866 let rocksdb = factory.rocksdb_provider();
1868 assert_eq!(
1869 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1870 Some(tx_num1),
1871 "Data should be visible after provider.commit()"
1872 );
1873 assert_eq!(
1874 rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
1875 Some(tx_num2),
1876 "Data should be visible after provider.commit()"
1877 );
1878 }
1879
1880 #[test]
1884 #[should_panic(expected = "account_history_in_rocksdb requires rocksdb tx")]
1885 fn test_settings_mismatch_panics() {
1886 let factory = create_test_provider_factory();
1887
1888 factory.set_storage_settings_cache(StorageSettings::v2());
1889
1890 let provider = factory.database_provider_ro().unwrap();
1891 let _ = EitherReader::<(), ()>::new_accounts_history(&provider, None);
1892 }
1893}