1use std::{
5 collections::BTreeSet,
6 marker::PhantomData,
7 ops::{Range, RangeInclusive},
8};
9
10use crate::{
11 providers::{
12 history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
13 StaticFileProviderRWRefMut,
14 },
15 StaticFileProviderFactory,
16};
17use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
18use rayon::slice::ParallelSliceMut;
19use reth_db::{
20 cursor::{DbCursorRO, DbDupCursorRW},
21 models::{AccountBeforeTx, StorageBeforeTx},
22 static_file::TransactionSenderMask,
23 table::Value,
24 transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut, DupCursorMutTy, DupCursorTy},
25};
26use reth_db_api::{
27 cursor::DbCursorRW,
28 models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
29 tables,
30 tables::BlockNumberList,
31};
32use reth_errors::ProviderError;
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{ReceiptTy, StorageEntry};
35use reth_static_file_types::StaticFileSegment;
36use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, StorageSettingsCache};
37use reth_storage_errors::provider::ProviderResult;
38use strum::{Display, EnumIs};
39
40type EitherReaderTy<'a, P, T> =
42 EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
43
44type DupEitherReaderTy<'a, P, T> = EitherReader<
46 'a,
47 DupCursorTy<<P as DBProvider>::Tx, T>,
48 <P as NodePrimitivesProvider>::Primitives,
49>;
50
51type DupEitherWriterTy<'a, P, T> = EitherWriter<
53 'a,
54 DupCursorMutTy<<P as DBProvider>::Tx, T>,
55 <P as NodePrimitivesProvider>::Primitives,
56>;
57
58type EitherWriterTy<'a, P, T> = EitherWriter<
60 'a,
61 CursorMutTy<<P as DBProvider>::Tx, T>,
62 <P as NodePrimitivesProvider>::Primitives,
63>;
64
65pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
67
68pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
70
71pub type RocksDBRefArg<'a> = Option<crate::providers::rocksdb::RocksReadSnapshot<'a>>;
76
77#[derive(Debug, Display)]
79pub enum EitherWriter<'a, CURSOR, N> {
80 Database(CURSOR),
82 StaticFile(StaticFileProviderRWRefMut<'a, N>),
84 RocksDB(RocksDBBatch<'a>),
86}
87
88impl<'a> EitherWriter<'a, (), ()> {
89 pub fn new_receipts<P>(
91 provider: &'a P,
92 block_number: BlockNumber,
93 ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
94 where
95 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
96 P::Tx: DbTxMut,
97 ReceiptTy<P::Primitives>: Value,
98 {
99 if Self::receipts_destination(provider).is_static_file() {
100 Ok(EitherWriter::StaticFile(
101 provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
102 ))
103 } else {
104 Ok(EitherWriter::Database(
105 provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
106 ))
107 }
108 }
109
110 pub fn new_senders<P>(
112 provider: &'a P,
113 block_number: BlockNumber,
114 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
115 where
116 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
117 P::Tx: DbTxMut,
118 {
119 if EitherWriterDestination::senders(provider).is_static_file() {
120 Ok(EitherWriter::StaticFile(
121 provider
122 .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
123 ))
124 } else {
125 Ok(EitherWriter::Database(
126 provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
127 ))
128 }
129 }
130
131 pub fn new_account_changesets<P>(
134 provider: &'a P,
135 block_number: BlockNumber,
136 ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::AccountChangeSets>>
137 where
138 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
139 P::Tx: DbTxMut,
140 {
141 if provider.cached_storage_settings().storage_v2 {
142 Ok(EitherWriter::StaticFile(
143 provider
144 .get_static_file_writer(block_number, StaticFileSegment::AccountChangeSets)?,
145 ))
146 } else {
147 Ok(EitherWriter::Database(
148 provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?,
149 ))
150 }
151 }
152
153 pub fn new_storage_changesets<P>(
155 provider: &'a P,
156 block_number: BlockNumber,
157 ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::StorageChangeSets>>
158 where
159 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
160 P::Tx: DbTxMut,
161 {
162 if provider.cached_storage_settings().storage_v2 {
163 Ok(EitherWriter::StaticFile(
164 provider
165 .get_static_file_writer(block_number, StaticFileSegment::StorageChangeSets)?,
166 ))
167 } else {
168 Ok(EitherWriter::Database(
169 provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?,
170 ))
171 }
172 }
173
174 pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
183 provider: &P,
184 ) -> EitherWriterDestination {
185 let receipts_in_static_files = provider.cached_storage_settings().storage_v2;
186 let prune_modes = provider.prune_modes_ref();
187
188 if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
189 receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
191 {
192 EitherWriterDestination::Database
193 } else {
194 EitherWriterDestination::StaticFile
195 }
196 }
197
198 pub fn account_changesets_destination<P: DBProvider + StorageSettingsCache>(
202 provider: &P,
203 ) -> EitherWriterDestination {
204 if provider.cached_storage_settings().storage_v2 {
205 EitherWriterDestination::StaticFile
206 } else {
207 EitherWriterDestination::Database
208 }
209 }
210
211 pub fn storage_changesets_destination<P: DBProvider + StorageSettingsCache>(
215 provider: &P,
216 ) -> EitherWriterDestination {
217 if provider.cached_storage_settings().storage_v2 {
218 EitherWriterDestination::StaticFile
219 } else {
220 EitherWriterDestination::Database
221 }
222 }
223
224 pub fn new_storages_history<P>(
226 provider: &P,
227 _rocksdb_batch: RocksBatchArg<'a>,
228 ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
229 where
230 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
231 P::Tx: DbTxMut,
232 {
233 if provider.cached_storage_settings().storage_v2 {
234 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
235 }
236
237 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
238 }
239
240 pub fn new_transaction_hash_numbers<P>(
242 provider: &P,
243 _rocksdb_batch: RocksBatchArg<'a>,
244 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
245 where
246 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
247 P::Tx: DbTxMut,
248 {
249 if provider.cached_storage_settings().storage_v2 {
250 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
251 }
252
253 Ok(EitherWriter::Database(
254 provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
255 ))
256 }
257
258 pub fn new_accounts_history<P>(
260 provider: &P,
261 _rocksdb_batch: RocksBatchArg<'a>,
262 ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
263 where
264 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
265 P::Tx: DbTxMut,
266 {
267 if provider.cached_storage_settings().storage_v2 {
268 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
269 }
270
271 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
272 }
273}
274
275impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
276 pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
284 match self {
285 Self::Database(_) | Self::StaticFile(_) => None,
286 Self::RocksDB(batch) => Some(batch.into_inner()),
287 }
288 }
289
290 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
294 match self {
295 Self::Database(_) => Ok(()),
296 Self::StaticFile(writer) => writer.increment_block(expected_block_number),
297 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
298 }
299 }
300
301 pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
308 match self {
309 Self::Database(_) => Ok(()),
310 Self::StaticFile(writer) => writer.ensure_at_block(block_number),
311 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
312 }
313 }
314}
315
316impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
317where
318 N::Receipt: Value,
319 CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
320{
321 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
323 match self {
324 Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
325 Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
326 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
327 }
328 }
329}
330
331impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
332where
333 CURSOR: DbCursorRW<tables::TransactionSenders>,
334{
335 pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
337 match self {
338 Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
339 Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
340 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
341 }
342 }
343
344 pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
346 where
347 I: Iterator<Item = (TxNumber, Address)>,
348 {
349 match self {
350 Self::Database(cursor) => {
351 for (tx_num, sender) in senders {
352 cursor.append(tx_num, &sender)?;
353 }
354 Ok(())
355 }
356 Self::StaticFile(writer) => writer.append_transaction_senders(senders),
357 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
358 }
359 }
360
361 pub fn prune_senders(
364 &mut self,
365 unwind_tx_from: TxNumber,
366 block: BlockNumber,
367 ) -> ProviderResult<()>
368 where
369 CURSOR: DbCursorRO<tables::TransactionSenders>,
370 {
371 match self {
372 Self::Database(cursor) => {
373 let mut walker = cursor.walk_range(unwind_tx_from..)?;
374 while walker.next().transpose()?.is_some() {
375 walker.delete_current()?;
376 }
377 }
378 Self::StaticFile(writer) => {
379 let static_file_transaction_sender_num = writer
380 .reader()
381 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
382
383 let to_delete = static_file_transaction_sender_num
384 .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
385 .unwrap_or_default();
386
387 writer.prune_transaction_senders(to_delete, block)?;
388 }
389 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
390 }
391
392 Ok(())
393 }
394}
395
396impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
397where
398 CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
399{
400 pub fn put_transaction_hash_number(
406 &mut self,
407 hash: TxHash,
408 tx_num: TxNumber,
409 append_only: bool,
410 ) -> ProviderResult<()> {
411 match self {
412 Self::Database(cursor) => {
413 if append_only {
414 Ok(cursor.append(hash, &tx_num)?)
415 } else {
416 Ok(cursor.upsert(hash, &tx_num)?)
417 }
418 }
419 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
420 Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
421 }
422 }
423
424 pub fn put_transaction_hash_numbers_batch(
433 &mut self,
434 entries: Vec<(TxHash, TxNumber)>,
435 append_only: bool,
436 ) -> ProviderResult<()> {
437 match self {
438 Self::Database(cursor) => {
439 for (hash, tx_num) in entries {
440 if append_only {
441 cursor.append(hash, &tx_num)?;
442 } else {
443 cursor.upsert(hash, &tx_num)?;
444 }
445 }
446 Ok(())
447 }
448 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
449 Self::RocksDB(batch) => {
450 for (hash, tx_num) in entries {
451 batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
452 }
453 Ok(())
454 }
455 }
456 }
457
458 pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
460 match self {
461 Self::Database(cursor) => {
462 if cursor.seek_exact(hash)?.is_some() {
463 cursor.delete_current()?;
464 }
465 Ok(())
466 }
467 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
468 Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
469 }
470 }
471}
472
473impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
474where
475 CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
476{
477 pub fn put_storage_history(
479 &mut self,
480 key: StorageShardedKey,
481 value: &BlockNumberList,
482 ) -> ProviderResult<()> {
483 match self {
484 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
485 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
486 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
487 }
488 }
489
490 pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
492 match self {
493 Self::Database(cursor) => {
494 if cursor.seek_exact(key)?.is_some() {
495 cursor.delete_current()?;
496 }
497 Ok(())
498 }
499 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
500 Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
501 }
502 }
503
504 pub fn append_storage_history(
506 &mut self,
507 key: StorageShardedKey,
508 value: &BlockNumberList,
509 ) -> ProviderResult<()> {
510 match self {
511 Self::Database(cursor) => Ok(cursor.append(key, value)?),
512 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
513 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
514 }
515 }
516
517 pub fn upsert_storage_history(
519 &mut self,
520 key: StorageShardedKey,
521 value: &BlockNumberList,
522 ) -> ProviderResult<()> {
523 match self {
524 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
525 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
526 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
527 }
528 }
529
530 pub fn get_last_storage_history_shard(
532 &mut self,
533 address: Address,
534 storage_key: B256,
535 ) -> ProviderResult<Option<BlockNumberList>> {
536 let key = StorageShardedKey::last(address, storage_key);
537 match self {
538 Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
539 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
540 Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
541 }
542 }
543}
544
545impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
546where
547 CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
548{
549 pub fn append_account_history(
551 &mut self,
552 key: ShardedKey<Address>,
553 value: &BlockNumberList,
554 ) -> ProviderResult<()> {
555 match self {
556 Self::Database(cursor) => Ok(cursor.append(key, value)?),
557 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
558 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
559 }
560 }
561
562 pub fn upsert_account_history(
564 &mut self,
565 key: ShardedKey<Address>,
566 value: &BlockNumberList,
567 ) -> ProviderResult<()> {
568 match self {
569 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
570 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
571 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
572 }
573 }
574
575 pub fn get_last_account_history_shard(
577 &mut self,
578 address: Address,
579 ) -> ProviderResult<Option<BlockNumberList>> {
580 match self {
581 Self::Database(cursor) => {
582 Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
583 }
584 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
585 Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
586 }
587 }
588
589 pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
591 match self {
592 Self::Database(cursor) => {
593 if cursor.seek_exact(key)?.is_some() {
594 cursor.delete_current()?;
595 }
596 Ok(())
597 }
598 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
599 Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
600 }
601 }
602}
603
604impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
605where
606 CURSOR: DbDupCursorRW<tables::AccountChangeSets>,
607{
608 pub fn append_account_changeset(
612 &mut self,
613 block_number: BlockNumber,
614 mut changeset: Vec<AccountBeforeTx>,
615 ) -> ProviderResult<()> {
616 changeset.par_sort_by_key(|a| a.address);
618 match self {
619 Self::Database(cursor) => {
620 for change in changeset {
621 cursor.append_dup(block_number, change)?;
622 }
623 }
624 Self::StaticFile(writer) => {
625 writer.append_account_changeset(changeset, block_number)?;
626 }
627 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
628 }
629
630 Ok(())
631 }
632}
633
634impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
635where
636 CURSOR: DbDupCursorRW<tables::StorageChangeSets>,
637{
638 pub fn append_storage_changeset(
642 &mut self,
643 block_number: BlockNumber,
644 mut changeset: Vec<StorageBeforeTx>,
645 ) -> ProviderResult<()> {
646 changeset.par_sort_by_key(|change| (change.address, change.key));
647
648 match self {
649 Self::Database(cursor) => {
650 for change in changeset {
651 let storage_id = BlockNumberAddress((block_number, change.address));
652 cursor.append_dup(
653 storage_id,
654 StorageEntry { key: change.key, value: change.value },
655 )?;
656 }
657 }
658 Self::StaticFile(writer) => {
659 writer.append_storage_changeset(changeset, block_number)?;
660 }
661 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
662 }
663
664 Ok(())
665 }
666}
667
668#[derive(Debug, Display)]
670pub enum EitherReader<'a, CURSOR, N> {
671 Database(CURSOR, PhantomData<&'a ()>),
673 StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
675 RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>),
677}
678
679impl<'a> EitherReader<'a, (), ()> {
680 pub fn new_senders<P>(
682 provider: &P,
683 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
684 where
685 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
686 P::Tx: DbTx,
687 {
688 if EitherWriterDestination::senders(provider).is_static_file() {
689 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
690 } else {
691 Ok(EitherReader::Database(
692 provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
693 PhantomData,
694 ))
695 }
696 }
697
698 pub fn new_storages_history<P>(
700 provider: &P,
701 rocksdb: RocksDBRefArg<'a>,
702 ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
703 where
704 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
705 P::Tx: DbTx,
706 {
707 if provider.cached_storage_settings().storage_v2 {
708 return Ok(EitherReader::RocksDB(
709 rocksdb.expect("storages_history_in_rocksdb requires rocksdb snapshot"),
710 ));
711 }
712
713 Ok(EitherReader::Database(
714 provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
715 PhantomData,
716 ))
717 }
718
719 pub fn new_transaction_hash_numbers<P>(
721 provider: &P,
722 rocksdb: RocksDBRefArg<'a>,
723 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
724 where
725 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
726 P::Tx: DbTx,
727 {
728 if provider.cached_storage_settings().storage_v2 {
729 return Ok(EitherReader::RocksDB(
730 rocksdb.expect("transaction_hash_numbers_in_rocksdb requires rocksdb snapshot"),
731 ));
732 }
733
734 Ok(EitherReader::Database(
735 provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
736 PhantomData,
737 ))
738 }
739
740 pub fn new_accounts_history<P>(
742 provider: &P,
743 rocksdb: RocksDBRefArg<'a>,
744 ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
745 where
746 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
747 P::Tx: DbTx,
748 {
749 if provider.cached_storage_settings().storage_v2 {
750 return Ok(EitherReader::RocksDB(
751 rocksdb.expect("account_history_in_rocksdb requires rocksdb snapshot"),
752 ));
753 }
754
755 Ok(EitherReader::Database(
756 provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
757 PhantomData,
758 ))
759 }
760
761 pub fn new_account_changesets<P>(
763 provider: &P,
764 ) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
765 where
766 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
767 P::Tx: DbTx,
768 {
769 if EitherWriterDestination::account_changesets(provider).is_static_file() {
770 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
771 } else {
772 Ok(EitherReader::Database(
773 provider.tx_ref().cursor_dup_read::<tables::AccountChangeSets>()?,
774 PhantomData,
775 ))
776 }
777 }
778}
779
780impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
781where
782 CURSOR: DbCursorRO<tables::TransactionSenders>,
783{
784 pub fn senders_by_tx_range(
786 &mut self,
787 range: Range<TxNumber>,
788 ) -> ProviderResult<HashMap<TxNumber, Address>> {
789 match self {
790 Self::Database(cursor, _) => cursor
791 .walk_range(range)?
792 .map(|result| result.map_err(ProviderError::from))
793 .collect::<ProviderResult<HashMap<_, _>>>(),
794 Self::StaticFile(provider, _) => range
795 .clone()
796 .zip(provider.fetch_range_iter(
797 StaticFileSegment::TransactionSenders,
798 range,
799 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
800 )?)
801 .filter_map(|(tx_num, sender)| {
802 let result = sender.transpose()?;
803 Some(result.map(|sender| (tx_num, sender)))
804 })
805 .collect::<ProviderResult<HashMap<_, _>>>(),
806 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
807 }
808 }
809}
810
811impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
812where
813 CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
814{
815 pub fn get_transaction_hash_number(
817 &mut self,
818 hash: TxHash,
819 ) -> ProviderResult<Option<TxNumber>> {
820 match self {
821 Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
822 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
823 Self::RocksDB(snapshot) => snapshot.get::<tables::TransactionHashNumbers>(hash),
824 }
825 }
826}
827
828impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
829where
830 CURSOR: DbCursorRO<tables::StoragesHistory>,
831{
832 pub fn get_storage_history(
834 &mut self,
835 key: StorageShardedKey,
836 ) -> ProviderResult<Option<BlockNumberList>> {
837 match self {
838 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
839 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
840 Self::RocksDB(snapshot) => snapshot.get::<tables::StoragesHistory>(key),
841 }
842 }
843
844 pub fn storage_history_info(
846 &mut self,
847 address: Address,
848 storage_key: alloy_primitives::B256,
849 block_number: BlockNumber,
850 lowest_available_block_number: Option<BlockNumber>,
851 ) -> ProviderResult<HistoryInfo> {
852 match self {
853 Self::Database(cursor, _) => {
854 let key = StorageShardedKey::new(address, storage_key, block_number);
855 history_info::<tables::StoragesHistory, _, _>(
856 cursor,
857 key,
858 block_number,
859 |k| k.address == address && k.sharded_key.key == storage_key,
860 lowest_available_block_number,
861 )
862 }
863 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
864 Self::RocksDB(snapshot) => snapshot.storage_history_info(
865 address,
866 storage_key,
867 block_number,
868 lowest_available_block_number,
869 ),
870 }
871 }
872}
873
874impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
875where
876 CURSOR: DbCursorRO<tables::AccountsHistory>,
877{
878 pub fn get_account_history(
880 &mut self,
881 key: ShardedKey<Address>,
882 ) -> ProviderResult<Option<BlockNumberList>> {
883 match self {
884 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
885 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
886 Self::RocksDB(snapshot) => snapshot.get::<tables::AccountsHistory>(key),
887 }
888 }
889
890 pub fn account_history_info(
892 &mut self,
893 address: Address,
894 block_number: BlockNumber,
895 lowest_available_block_number: Option<BlockNumber>,
896 ) -> ProviderResult<HistoryInfo> {
897 match self {
898 Self::Database(cursor, _) => {
899 let key = ShardedKey::new(address, block_number);
900 history_info::<tables::AccountsHistory, _, _>(
901 cursor,
902 key,
903 block_number,
904 |k| k.key == address,
905 lowest_available_block_number,
906 )
907 }
908 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
909 Self::RocksDB(snapshot) => {
910 snapshot.account_history_info(address, block_number, lowest_available_block_number)
911 }
912 }
913 }
914}
915
916impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
917where
918 CURSOR: DbCursorRO<tables::AccountChangeSets>,
919{
920 pub fn changed_accounts_with_range(
922 &mut self,
923 range: RangeInclusive<BlockNumber>,
924 ) -> ProviderResult<BTreeSet<Address>> {
925 match self {
926 Self::StaticFile(provider, _) => {
927 let highest_static_block =
928 provider.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
929
930 let Some(highest) = highest_static_block else {
931 return Err(ProviderError::MissingHighestStaticFileBlock(
932 StaticFileSegment::AccountChangeSets,
933 ))
934 };
935
936 let start = *range.start();
937 let static_end = (*range.end()).min(highest);
938
939 let mut changed_accounts = BTreeSet::default();
940 if start <= static_end {
941 for block in start..=static_end {
942 let block_changesets = provider.account_block_changeset(block)?;
943 for changeset in block_changesets {
944 changed_accounts.insert(changeset.address);
945 }
946 }
947 }
948
949 Ok(changed_accounts)
950 }
951 Self::Database(provider, _) => provider
952 .walk_range(range)?
953 .map(|entry| {
954 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
955 })
956 .collect(),
957 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
958 }
959 }
960}
961
962#[derive(Debug, EnumIs)]
964pub enum EitherWriterDestination {
965 Database,
967 StaticFile,
969 RocksDB,
971}
972
973impl EitherWriterDestination {
974 pub fn senders<P>(provider: &P) -> Self
976 where
977 P: StorageSettingsCache,
978 {
979 if provider.cached_storage_settings().storage_v2 {
981 Self::StaticFile
982 } else {
983 Self::Database
984 }
985 }
986
987 pub fn account_changesets<P>(provider: &P) -> Self
989 where
990 P: StorageSettingsCache,
991 {
992 if provider.cached_storage_settings().storage_v2 {
994 Self::StaticFile
995 } else {
996 Self::Database
997 }
998 }
999
1000 pub fn storage_changesets<P>(provider: &P) -> Self
1002 where
1003 P: StorageSettingsCache,
1004 {
1005 if provider.cached_storage_settings().storage_v2 {
1007 Self::StaticFile
1008 } else {
1009 Self::Database
1010 }
1011 }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use crate::{test_utils::create_test_provider_factory, StaticFileWriter};
1017
1018 use super::*;
1019 use alloy_primitives::Address;
1020 use reth_db::models::AccountBeforeTx;
1021 use reth_static_file_types::StaticFileSegment;
1022 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1023
1024 #[test]
1036 fn test_changed_accounts_with_range_caps_at_static_file_tip() {
1037 let factory = create_test_provider_factory();
1038 let highest_block = 5u64;
1039
1040 let addresses: Vec<Address> = (0..=highest_block)
1041 .map(|i| {
1042 let mut addr = Address::ZERO;
1043 addr.0[0] = i as u8;
1044 addr
1045 })
1046 .collect();
1047
1048 {
1049 let sf_provider = factory.static_file_provider();
1050 let mut writer =
1051 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1052
1053 for block_num in 0..=highest_block {
1054 let changeset =
1055 vec![AccountBeforeTx { address: addresses[block_num as usize], info: None }];
1056 writer.append_account_changeset(changeset, block_num).unwrap();
1057 }
1058 writer.commit().unwrap();
1059 }
1060
1061 factory.set_storage_settings_cache(StorageSettings::v2());
1062
1063 let provider = factory.database_provider_ro().unwrap();
1064
1065 let sf_tip = provider
1066 .static_file_provider()
1067 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1068 assert_eq!(sf_tip, Some(highest_block));
1069
1070 let mut reader = EitherReader::new_account_changesets(&provider).unwrap();
1071 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1072
1073 let result = reader.changed_accounts_with_range(0..=10).unwrap();
1075
1076 let expected: BTreeSet<Address> = addresses.into_iter().collect();
1077 assert_eq!(result, expected);
1078 }
1079
1080 #[test]
1081 fn test_reader_senders_by_tx_range() {
1082 let factory = create_test_provider_factory();
1083
1084 let senders = [
1086 (1, Address::random()),
1087 (2, Address::random()),
1088 (3, Address::random()),
1089 (4, Address::random()),
1090 ];
1091
1092 for transaction_senders_in_static_files in [false, true] {
1093 factory.set_storage_settings_cache(if transaction_senders_in_static_files {
1094 StorageSettings::v2()
1095 } else {
1096 StorageSettings::v1()
1097 });
1098
1099 let provider = factory.database_provider_rw().unwrap();
1100 let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
1101 if transaction_senders_in_static_files {
1102 assert!(matches!(writer, EitherWriter::StaticFile(_)));
1103 } else {
1104 assert!(matches!(writer, EitherWriter::Database(_)));
1105 }
1106
1107 writer.increment_block(0).unwrap();
1108 writer.append_senders(senders.iter().copied()).unwrap();
1109 drop(writer);
1110 provider.commit().unwrap();
1111
1112 let provider = factory.database_provider_ro().unwrap();
1113 let mut reader = EitherReader::new_senders(&provider).unwrap();
1114 if transaction_senders_in_static_files {
1115 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1116 } else {
1117 assert!(matches!(reader, EitherReader::Database(_, _)));
1118 }
1119
1120 assert_eq!(
1121 reader.senders_by_tx_range(0..6).unwrap(),
1122 senders.iter().copied().collect::<HashMap<_, _>>(),
1123 "{reader}"
1124 );
1125 }
1126 }
1127}
1128
1129#[cfg(test)]
1130mod rocksdb_tests {
1131 use super::*;
1132 use crate::{
1133 providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
1134 test_utils::create_test_provider_factory,
1135 RocksDBProviderFactory,
1136 };
1137 use alloy_primitives::{Address, B256};
1138 use reth_db_api::{
1139 models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1140 tables,
1141 transaction::DbTxMut,
1142 };
1143 use reth_ethereum_primitives::EthPrimitives;
1144 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1145 use std::marker::PhantomData;
1146 use tempfile::TempDir;
1147
1148 fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
1149 let temp_dir = TempDir::new().unwrap();
1150 let provider = RocksDBBuilder::new(temp_dir.path())
1151 .with_table::<tables::TransactionHashNumbers>()
1152 .with_table::<tables::StoragesHistory>()
1153 .with_table::<tables::AccountsHistory>()
1154 .build()
1155 .unwrap();
1156 (temp_dir, provider)
1157 }
1158
1159 #[test]
1163 fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
1164 let factory = create_test_provider_factory();
1165
1166 factory.set_storage_settings_cache(StorageSettings::v2());
1168
1169 let hash1 = B256::from([1u8; 32]);
1170 let hash2 = B256::from([2u8; 32]);
1171 let tx_num1 = 100u64;
1172 let tx_num2 = 200u64;
1173
1174 let rocksdb = factory.rocksdb_provider();
1176 let batch = rocksdb.batch();
1177
1178 let provider = factory.database_provider_rw().unwrap();
1180 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1181
1182 assert!(matches!(writer, EitherWriter::RocksDB(_)));
1184
1185 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1187 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1188
1189 if let Some(batch) = writer.into_raw_rocksdb_batch() {
1191 provider.set_pending_rocksdb_batch(batch);
1192 }
1193
1194 provider.commit().unwrap();
1196
1197 let rocksdb = factory.rocksdb_provider();
1199 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1200 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1201 }
1202
1203 #[test]
1205 fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
1206 let factory = create_test_provider_factory();
1207
1208 factory.set_storage_settings_cache(StorageSettings::v2());
1210
1211 let hash = B256::from([1u8; 32]);
1212 let tx_num = 100u64;
1213
1214 let rocksdb = factory.rocksdb_provider();
1216 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1217 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1218
1219 let batch = rocksdb.batch();
1221 let provider = factory.database_provider_rw().unwrap();
1222 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1223 writer.delete_transaction_hash_number(hash).unwrap();
1224
1225 if let Some(batch) = writer.into_raw_rocksdb_batch() {
1227 provider.set_pending_rocksdb_batch(batch);
1228 }
1229 provider.commit().unwrap();
1230
1231 let rocksdb = factory.rocksdb_provider();
1233 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1234 }
1235
1236 #[test]
1237 fn test_rocksdb_batch_transaction_hash_numbers() {
1238 let (_temp_dir, provider) = create_rocksdb_provider();
1239
1240 let hash1 = B256::from([1u8; 32]);
1241 let hash2 = B256::from([2u8; 32]);
1242 let tx_num1 = 100u64;
1243 let tx_num2 = 200u64;
1244
1245 let mut batch = provider.batch();
1247 batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
1248 batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
1249 batch.commit().unwrap();
1250
1251 let tx = provider.tx();
1253 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1254 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1255
1256 let missing_hash = B256::from([99u8; 32]);
1258 assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
1259 }
1260
1261 #[test]
1262 fn test_rocksdb_batch_storage_history() {
1263 let (_temp_dir, provider) = create_rocksdb_provider();
1264
1265 let address = Address::random();
1266 let storage_key = B256::from([1u8; 32]);
1267 let key = StorageShardedKey::new(address, storage_key, 1000);
1268 let value = IntegerList::new([1, 5, 10, 50]).unwrap();
1269
1270 let mut batch = provider.batch();
1272 batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1273 batch.commit().unwrap();
1274
1275 let tx = provider.tx();
1277 let result = tx.get::<tables::StoragesHistory>(key).unwrap();
1278 assert_eq!(result, Some(value));
1279
1280 let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
1282 assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
1283 }
1284
1285 #[test]
1286 fn test_rocksdb_batch_account_history() {
1287 let (_temp_dir, provider) = create_rocksdb_provider();
1288
1289 let address = Address::random();
1290 let key = ShardedKey::new(address, 1000);
1291 let value = IntegerList::new([1, 10, 100, 500]).unwrap();
1292
1293 let mut batch = provider.batch();
1295 batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1296 batch.commit().unwrap();
1297
1298 let tx = provider.tx();
1300 let result = tx.get::<tables::AccountsHistory>(key).unwrap();
1301 assert_eq!(result, Some(value));
1302
1303 let missing_key = ShardedKey::new(Address::random(), 0);
1305 assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
1306 }
1307
1308 #[test]
1309 fn test_rocksdb_batch_delete_transaction_hash_number() {
1310 let (_temp_dir, provider) = create_rocksdb_provider();
1311
1312 let hash = B256::from([1u8; 32]);
1313 let tx_num = 100u64;
1314
1315 provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1317 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1318
1319 let mut batch = provider.batch();
1321 batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
1322 batch.commit().unwrap();
1323
1324 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1326 }
1327
1328 #[test]
1329 fn test_rocksdb_batch_delete_storage_history() {
1330 let (_temp_dir, provider) = create_rocksdb_provider();
1331
1332 let address = Address::random();
1333 let storage_key = B256::from([1u8; 32]);
1334 let key = StorageShardedKey::new(address, storage_key, 1000);
1335 let value = IntegerList::new([1, 5, 10]).unwrap();
1336
1337 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1339 assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
1340
1341 let mut batch = provider.batch();
1343 batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
1344 batch.commit().unwrap();
1345
1346 assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
1348 }
1349
1350 #[test]
1351 fn test_rocksdb_batch_delete_account_history() {
1352 let (_temp_dir, provider) = create_rocksdb_provider();
1353
1354 let address = Address::random();
1355 let key = ShardedKey::new(address, 1000);
1356 let value = IntegerList::new([1, 10, 100]).unwrap();
1357
1358 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1360 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
1361
1362 let mut batch = provider.batch();
1364 batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
1365 batch.commit().unwrap();
1366
1367 assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
1369 }
1370
1371 struct HistoryQuery {
1378 block_number: BlockNumber,
1379 lowest_available: Option<BlockNumber>,
1380 expected: HistoryInfo,
1381 }
1382
1383 type AccountsHistoryWriteCursor =
1385 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
1386 type StoragesHistoryWriteCursor =
1387 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
1388 type AccountsHistoryReadCursor =
1389 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
1390 type StoragesHistoryReadCursor =
1391 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
1392
1393 fn run_account_history_scenario(
1396 scenario_name: &str,
1397 address: Address,
1398 shards: &[(BlockNumber, Vec<BlockNumber>)], queries: &[HistoryQuery],
1400 ) {
1401 let factory = create_test_provider_factory();
1403 let mdbx_provider = factory.database_provider_rw().unwrap();
1404 let (temp_dir, rocks_provider) = create_rocksdb_provider();
1405
1406 let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1408 EitherWriter::Database(
1409 mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
1410 );
1411 let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1412 EitherWriter::RocksDB(rocks_provider.batch());
1413
1414 for (highest_block, blocks) in shards {
1416 let key = ShardedKey::new(address, *highest_block);
1417 let value = IntegerList::new(blocks.clone()).unwrap();
1418 mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
1419 rocks_writer.upsert_account_history(key, &value).unwrap();
1420 }
1421
1422 drop(mdbx_writer);
1424 mdbx_provider.commit().unwrap();
1425 if let EitherWriter::RocksDB(batch) = rocks_writer {
1426 batch.commit().unwrap();
1427 }
1428
1429 let mdbx_ro = factory.database_provider_ro().unwrap();
1431 let rocks_snapshot = rocks_provider.snapshot();
1432
1433 for (i, query) in queries.iter().enumerate() {
1434 let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1436 EitherReader::Database(
1437 mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
1438 PhantomData,
1439 );
1440 let mdbx_result = mdbx_reader
1441 .account_history_info(address, query.block_number, query.lowest_available)
1442 .unwrap();
1443
1444 let rocks_result = rocks_snapshot
1446 .account_history_info(address, query.block_number, query.lowest_available)
1447 .unwrap();
1448
1449 assert_eq!(
1451 mdbx_result,
1452 rocks_result,
1453 "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1454 MDBX: {:?}, RocksDB: {:?}",
1455 scenario_name,
1456 i,
1457 query.block_number,
1458 query.lowest_available,
1459 mdbx_result,
1460 rocks_result
1461 );
1462
1463 assert_eq!(
1465 mdbx_result,
1466 query.expected,
1467 "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1468 Got: {:?}, Expected: {:?}",
1469 scenario_name,
1470 i,
1471 query.block_number,
1472 query.lowest_available,
1473 mdbx_result,
1474 query.expected
1475 );
1476 }
1477
1478 drop(temp_dir);
1479 }
1480
1481 fn run_storage_history_scenario(
1484 scenario_name: &str,
1485 address: Address,
1486 storage_key: B256,
1487 shards: &[(BlockNumber, Vec<BlockNumber>)], queries: &[HistoryQuery],
1489 ) {
1490 let factory = create_test_provider_factory();
1492 let mdbx_provider = factory.database_provider_rw().unwrap();
1493 let (temp_dir, rocks_provider) = create_rocksdb_provider();
1494
1495 let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1497 EitherWriter::Database(
1498 mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
1499 );
1500 let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1501 EitherWriter::RocksDB(rocks_provider.batch());
1502
1503 for (highest_block, blocks) in shards {
1505 let key = StorageShardedKey::new(address, storage_key, *highest_block);
1506 let value = IntegerList::new(blocks.clone()).unwrap();
1507 mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
1508 rocks_writer.put_storage_history(key, &value).unwrap();
1509 }
1510
1511 drop(mdbx_writer);
1513 mdbx_provider.commit().unwrap();
1514 if let EitherWriter::RocksDB(batch) = rocks_writer {
1515 batch.commit().unwrap();
1516 }
1517
1518 let mdbx_ro = factory.database_provider_ro().unwrap();
1520 let rocks_snapshot = rocks_provider.snapshot();
1521
1522 for (i, query) in queries.iter().enumerate() {
1523 let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1525 EitherReader::Database(
1526 mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
1527 PhantomData,
1528 );
1529 let mdbx_result = mdbx_reader
1530 .storage_history_info(
1531 address,
1532 storage_key,
1533 query.block_number,
1534 query.lowest_available,
1535 )
1536 .unwrap();
1537
1538 let rocks_result = rocks_snapshot
1540 .storage_history_info(
1541 address,
1542 storage_key,
1543 query.block_number,
1544 query.lowest_available,
1545 )
1546 .unwrap();
1547
1548 assert_eq!(
1550 mdbx_result,
1551 rocks_result,
1552 "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1553 MDBX: {:?}, RocksDB: {:?}",
1554 scenario_name,
1555 i,
1556 query.block_number,
1557 query.lowest_available,
1558 mdbx_result,
1559 rocks_result
1560 );
1561
1562 assert_eq!(
1564 mdbx_result,
1565 query.expected,
1566 "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1567 Got: {:?}, Expected: {:?}",
1568 scenario_name,
1569 i,
1570 query.block_number,
1571 query.lowest_available,
1572 mdbx_result,
1573 query.expected
1574 );
1575 }
1576
1577 drop(temp_dir);
1578 }
1579
1580 #[test]
1588 fn test_account_history_info_both_backends() {
1589 let address = Address::from([0x42; 20]);
1590
1591 run_account_history_scenario(
1593 "single_shard",
1594 address,
1595 &[(u64::MAX, vec![100, 200, 300])],
1596 &[
1597 HistoryQuery {
1599 block_number: 50,
1600 lowest_available: None,
1601 expected: HistoryInfo::NotYetWritten,
1602 },
1603 HistoryQuery {
1605 block_number: 150,
1606 lowest_available: None,
1607 expected: HistoryInfo::InChangeset(200),
1608 },
1609 HistoryQuery {
1611 block_number: 300,
1612 lowest_available: None,
1613 expected: HistoryInfo::InChangeset(300),
1614 },
1615 HistoryQuery {
1617 block_number: 500,
1618 lowest_available: None,
1619 expected: HistoryInfo::InPlainState,
1620 },
1621 ],
1622 );
1623
1624 run_account_history_scenario(
1626 "multiple_shards",
1627 address,
1628 &[
1629 (500, vec![100, 200, 300, 400, 500]), (u64::MAX, vec![600, 700, 800]), ],
1632 &[
1633 HistoryQuery {
1635 block_number: 50,
1636 lowest_available: None,
1637 expected: HistoryInfo::NotYetWritten,
1638 },
1639 HistoryQuery {
1641 block_number: 150,
1642 lowest_available: None,
1643 expected: HistoryInfo::InChangeset(200),
1644 },
1645 HistoryQuery {
1647 block_number: 550,
1648 lowest_available: None,
1649 expected: HistoryInfo::InChangeset(600),
1650 },
1651 HistoryQuery {
1653 block_number: 900,
1654 lowest_available: None,
1655 expected: HistoryInfo::InPlainState,
1656 },
1657 ],
1658 );
1659
1660 let address_without_history = Address::from([0x43; 20]);
1662 run_account_history_scenario(
1663 "no_history",
1664 address_without_history,
1665 &[], &[HistoryQuery {
1667 block_number: 150,
1668 lowest_available: None,
1669 expected: HistoryInfo::NotYetWritten,
1670 }],
1671 );
1672
1673 run_account_history_scenario(
1679 "with_pruning_boundary",
1680 address,
1681 &[(u64::MAX, vec![100, 200, 300])],
1682 &[
1683 HistoryQuery {
1685 block_number: 100,
1686 lowest_available: Some(100),
1687 expected: HistoryInfo::InChangeset(100),
1688 },
1689 HistoryQuery {
1691 block_number: 150,
1692 lowest_available: Some(100),
1693 expected: HistoryInfo::InChangeset(200),
1694 },
1695 ],
1696 );
1697 }
1698
1699 #[test]
1701 fn test_storage_history_info_both_backends() {
1702 let address = Address::from([0x42; 20]);
1703 let storage_key = B256::from([0x01; 32]);
1704 let other_storage_key = B256::from([0x02; 32]);
1705
1706 run_storage_history_scenario(
1708 "storage_single_shard",
1709 address,
1710 storage_key,
1711 &[(u64::MAX, vec![100, 200, 300])],
1712 &[
1713 HistoryQuery {
1715 block_number: 50,
1716 lowest_available: None,
1717 expected: HistoryInfo::NotYetWritten,
1718 },
1719 HistoryQuery {
1721 block_number: 150,
1722 lowest_available: None,
1723 expected: HistoryInfo::InChangeset(200),
1724 },
1725 HistoryQuery {
1727 block_number: 500,
1728 lowest_available: None,
1729 expected: HistoryInfo::InPlainState,
1730 },
1731 ],
1732 );
1733
1734 run_storage_history_scenario(
1736 "storage_no_history",
1737 address,
1738 other_storage_key,
1739 &[], &[HistoryQuery {
1741 block_number: 150,
1742 lowest_available: None,
1743 expected: HistoryInfo::NotYetWritten,
1744 }],
1745 );
1746 }
1747
1748 #[test]
1751 fn test_rocksdb_commits_at_provider_level() {
1752 let factory = create_test_provider_factory();
1753
1754 factory.set_storage_settings_cache(StorageSettings::v2());
1756
1757 let hash1 = B256::from([1u8; 32]);
1758 let hash2 = B256::from([2u8; 32]);
1759 let tx_num1 = 100u64;
1760 let tx_num2 = 200u64;
1761
1762 let rocksdb = factory.rocksdb_provider();
1764 let batch = rocksdb.batch();
1765
1766 let provider = factory.database_provider_rw().unwrap();
1768 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1769
1770 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1772 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1773
1774 let raw_batch = writer.into_raw_rocksdb_batch();
1776 if let Some(batch) = raw_batch {
1777 provider.set_pending_rocksdb_batch(batch);
1778 }
1779
1780 let rocksdb = factory.rocksdb_provider();
1782 assert_eq!(
1783 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1784 None,
1785 "Data should not be visible before provider.commit()"
1786 );
1787
1788 provider.commit().unwrap();
1790
1791 let rocksdb = factory.rocksdb_provider();
1793 assert_eq!(
1794 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1795 Some(tx_num1),
1796 "Data should be visible after provider.commit()"
1797 );
1798 assert_eq!(
1799 rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
1800 Some(tx_num2),
1801 "Data should be visible after provider.commit()"
1802 );
1803 }
1804
1805 #[test]
1809 #[should_panic(expected = "account_history_in_rocksdb requires rocksdb snapshot")]
1810 fn test_settings_mismatch_panics() {
1811 let factory = create_test_provider_factory();
1812
1813 factory.set_storage_settings_cache(StorageSettings::v2());
1814
1815 let provider = factory.database_provider_ro().unwrap();
1816 let _ = EitherReader::<(), ()>::new_accounts_history(&provider, None);
1817 }
1818}