1use std::{
5 collections::BTreeSet,
6 marker::PhantomData,
7 ops::{Range, RangeInclusive},
8};
9
10use crate::{
11 providers::{
12 history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
13 StaticFileProviderRWRefMut,
14 },
15 StaticFileProviderFactory,
16};
17use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
18use rayon::slice::ParallelSliceMut;
19use reth_db::{
20 cursor::{DbCursorRO, DbDupCursorRW},
21 models::{AccountBeforeTx, StorageBeforeTx},
22 static_file::TransactionSenderMask,
23 table::Value,
24 transaction::{CursorMutTy, CursorTy, DbTx, DbTxMut, DupCursorMutTy, DupCursorTy},
25};
26use reth_db_api::{
27 cursor::DbCursorRW,
28 models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
29 tables,
30 tables::BlockNumberList,
31};
32use reth_errors::ProviderError;
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{ReceiptTy, StorageEntry};
35use reth_static_file_types::StaticFileSegment;
36use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, StorageSettingsCache};
37use reth_storage_errors::provider::ProviderResult;
38use strum::{Display, EnumIs};
39
40type EitherReaderTy<'a, P, T> =
42 EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
43
44type DupEitherReaderTy<'a, P, T> = EitherReader<
46 'a,
47 DupCursorTy<<P as DBProvider>::Tx, T>,
48 <P as NodePrimitivesProvider>::Primitives,
49>;
50
51type DupEitherWriterTy<'a, P, T> = EitherWriter<
53 'a,
54 DupCursorMutTy<<P as DBProvider>::Tx, T>,
55 <P as NodePrimitivesProvider>::Primitives,
56>;
57
58type EitherWriterTy<'a, P, T> = EitherWriter<
60 'a,
61 CursorMutTy<<P as DBProvider>::Tx, T>,
62 <P as NodePrimitivesProvider>::Primitives,
63>;
64
65pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
67
68pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
70
71pub type RocksDBRefArg<'a> = Option<crate::providers::rocksdb::RocksReadSnapshot<'a>>;
76
77#[derive(Debug, Display)]
79pub enum EitherWriter<'a, CURSOR, N> {
80 Database(CURSOR),
82 StaticFile(StaticFileProviderRWRefMut<'a, N>),
84 RocksDB(RocksDBBatch<'a>),
86}
87
88impl<'a> EitherWriter<'a, (), ()> {
89 pub fn new_receipts<P>(
91 provider: &'a P,
92 block_number: BlockNumber,
93 ) -> ProviderResult<EitherWriterTy<'a, P, tables::Receipts<ReceiptTy<P::Primitives>>>>
94 where
95 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
96 P::Tx: DbTxMut,
97 ReceiptTy<P::Primitives>: Value,
98 {
99 if Self::receipts_destination(provider).is_static_file() {
100 Ok(EitherWriter::StaticFile(
101 provider.get_static_file_writer(block_number, StaticFileSegment::Receipts)?,
102 ))
103 } else {
104 Ok(EitherWriter::Database(
105 provider.tx_ref().cursor_write::<tables::Receipts<ReceiptTy<P::Primitives>>>()?,
106 ))
107 }
108 }
109
110 pub fn new_senders<P>(
112 provider: &'a P,
113 block_number: BlockNumber,
114 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionSenders>>
115 where
116 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
117 P::Tx: DbTxMut,
118 {
119 if EitherWriterDestination::senders(provider).is_static_file() {
120 Ok(EitherWriter::StaticFile(
121 provider
122 .get_static_file_writer(block_number, StaticFileSegment::TransactionSenders)?,
123 ))
124 } else {
125 Ok(EitherWriter::Database(
126 provider.tx_ref().cursor_write::<tables::TransactionSenders>()?,
127 ))
128 }
129 }
130
131 pub fn new_account_changesets<P>(
134 provider: &'a P,
135 block_number: BlockNumber,
136 ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::AccountChangeSets>>
137 where
138 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
139 P::Tx: DbTxMut,
140 {
141 if provider.cached_storage_settings().storage_v2 {
142 Ok(EitherWriter::StaticFile(
143 provider
144 .get_static_file_writer(block_number, StaticFileSegment::AccountChangeSets)?,
145 ))
146 } else {
147 Ok(EitherWriter::Database(
148 provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?,
149 ))
150 }
151 }
152
153 pub fn new_storage_changesets<P>(
155 provider: &'a P,
156 block_number: BlockNumber,
157 ) -> ProviderResult<DupEitherWriterTy<'a, P, tables::StorageChangeSets>>
158 where
159 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
160 P::Tx: DbTxMut,
161 {
162 if provider.cached_storage_settings().storage_v2 {
163 Ok(EitherWriter::StaticFile(
164 provider
165 .get_static_file_writer(block_number, StaticFileSegment::StorageChangeSets)?,
166 ))
167 } else {
168 Ok(EitherWriter::Database(
169 provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?,
170 ))
171 }
172 }
173
174 pub fn receipts_destination<P: DBProvider + StorageSettingsCache>(
183 provider: &P,
184 ) -> EitherWriterDestination {
185 let receipts_in_static_files = provider.cached_storage_settings().storage_v2;
186 let prune_modes = provider.prune_modes_ref();
187
188 if !receipts_in_static_files && prune_modes.has_receipts_pruning() ||
189 receipts_in_static_files && !prune_modes.receipts_log_filter.is_empty()
191 {
192 EitherWriterDestination::Database
193 } else {
194 EitherWriterDestination::StaticFile
195 }
196 }
197
198 pub fn account_changesets_destination<P: DBProvider + StorageSettingsCache>(
202 provider: &P,
203 ) -> EitherWriterDestination {
204 if provider.cached_storage_settings().storage_v2 {
205 EitherWriterDestination::StaticFile
206 } else {
207 EitherWriterDestination::Database
208 }
209 }
210
211 pub fn storage_changesets_destination<P: DBProvider + StorageSettingsCache>(
215 provider: &P,
216 ) -> EitherWriterDestination {
217 if provider.cached_storage_settings().storage_v2 {
218 EitherWriterDestination::StaticFile
219 } else {
220 EitherWriterDestination::Database
221 }
222 }
223
224 pub fn new_storages_history<P>(
226 provider: &P,
227 _rocksdb_batch: RocksBatchArg<'a>,
228 ) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
229 where
230 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
231 P::Tx: DbTxMut,
232 {
233 if provider.cached_storage_settings().storage_v2 {
234 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
235 }
236
237 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
238 }
239
240 pub fn new_transaction_hash_numbers<P>(
242 provider: &P,
243 _rocksdb_batch: RocksBatchArg<'a>,
244 ) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
245 where
246 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
247 P::Tx: DbTxMut,
248 {
249 if provider.cached_storage_settings().storage_v2 {
250 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
251 }
252
253 Ok(EitherWriter::Database(
254 provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
255 ))
256 }
257
258 pub fn new_accounts_history<P>(
260 provider: &P,
261 _rocksdb_batch: RocksBatchArg<'a>,
262 ) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
263 where
264 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
265 P::Tx: DbTxMut,
266 {
267 if provider.cached_storage_settings().storage_v2 {
268 return Ok(EitherWriter::RocksDB(_rocksdb_batch));
269 }
270
271 Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
272 }
273}
274
275impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
276 pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
284 match self {
285 Self::Database(_) | Self::StaticFile(_) => None,
286 Self::RocksDB(batch) => Some(batch.into_inner()),
287 }
288 }
289
290 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
294 match self {
295 Self::Database(_) => Ok(()),
296 Self::StaticFile(writer) => writer.increment_block(expected_block_number),
297 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
298 }
299 }
300
301 pub fn ensure_at_block(&mut self, block_number: BlockNumber) -> ProviderResult<()> {
308 match self {
309 Self::Database(_) => Ok(()),
310 Self::StaticFile(writer) => writer.ensure_at_block(block_number),
311 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
312 }
313 }
314}
315
316impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
317where
318 N::Receipt: Value,
319 CURSOR: DbCursorRW<tables::Receipts<N::Receipt>>,
320{
321 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> {
323 match self {
324 Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
325 Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
326 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
327 }
328 }
329}
330
331impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
332where
333 CURSOR: DbCursorRW<tables::TransactionSenders>,
334{
335 pub fn append_sender(&mut self, tx_num: TxNumber, sender: &Address) -> ProviderResult<()> {
337 match self {
338 Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
339 Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
340 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
341 }
342 }
343
344 pub fn append_senders<I>(&mut self, senders: I) -> ProviderResult<()>
346 where
347 I: Iterator<Item = (TxNumber, Address)>,
348 {
349 match self {
350 Self::Database(cursor) => {
351 for (tx_num, sender) in senders {
352 cursor.append(tx_num, &sender)?;
353 }
354 Ok(())
355 }
356 Self::StaticFile(writer) => writer.append_transaction_senders(senders),
357 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
358 }
359 }
360
361 pub fn prune_senders(
364 &mut self,
365 unwind_tx_from: TxNumber,
366 block: BlockNumber,
367 ) -> ProviderResult<()>
368 where
369 CURSOR: DbCursorRO<tables::TransactionSenders>,
370 {
371 match self {
372 Self::Database(cursor) => {
373 let mut walker = cursor.walk_range(unwind_tx_from..)?;
374 while walker.next().transpose()?.is_some() {
375 walker.delete_current()?;
376 }
377 }
378 Self::StaticFile(writer) => {
379 let static_file_transaction_sender_num = writer
380 .reader()
381 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders);
382
383 let to_delete = static_file_transaction_sender_num
384 .map(|static_num| (static_num + 1).saturating_sub(unwind_tx_from))
385 .unwrap_or_default();
386
387 writer.prune_transaction_senders(to_delete, block)?;
388 }
389 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
390 }
391
392 Ok(())
393 }
394}
395
396impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
397where
398 CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
399{
400 pub fn put_transaction_hash_number(
406 &mut self,
407 hash: TxHash,
408 tx_num: TxNumber,
409 append_only: bool,
410 ) -> ProviderResult<()> {
411 match self {
412 Self::Database(cursor) => {
413 if append_only {
414 Ok(cursor.append(hash, &tx_num)?)
415 } else {
416 Ok(cursor.upsert(hash, &tx_num)?)
417 }
418 }
419 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
420 Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
421 }
422 }
423
424 pub fn put_transaction_hash_numbers_batch(
433 &mut self,
434 entries: Vec<(TxHash, TxNumber)>,
435 append_only: bool,
436 ) -> ProviderResult<()> {
437 match self {
438 Self::Database(cursor) => {
439 for (hash, tx_num) in entries {
440 if append_only {
441 cursor.append(hash, &tx_num)?;
442 } else {
443 cursor.upsert(hash, &tx_num)?;
444 }
445 }
446 Ok(())
447 }
448 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
449 Self::RocksDB(batch) => {
450 for (hash, tx_num) in entries {
451 batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
452 }
453 Ok(())
454 }
455 }
456 }
457
458 pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
460 match self {
461 Self::Database(cursor) => {
462 if cursor.seek_exact(hash)?.is_some() {
463 cursor.delete_current()?;
464 }
465 Ok(())
466 }
467 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
468 Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
469 }
470 }
471}
472
473impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
474where
475 CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
476{
477 pub fn put_storage_history(
479 &mut self,
480 key: StorageShardedKey,
481 value: &BlockNumberList,
482 ) -> ProviderResult<()> {
483 match self {
484 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
485 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
486 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
487 }
488 }
489
490 pub fn delete_storage_history(&mut self, key: StorageShardedKey) -> ProviderResult<()> {
492 match self {
493 Self::Database(cursor) => {
494 if cursor.seek_exact(key)?.is_some() {
495 cursor.delete_current()?;
496 }
497 Ok(())
498 }
499 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
500 Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
501 }
502 }
503
504 pub fn append_storage_history(
506 &mut self,
507 key: StorageShardedKey,
508 value: &BlockNumberList,
509 ) -> ProviderResult<()> {
510 match self {
511 Self::Database(cursor) => Ok(cursor.append(key, value)?),
512 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
513 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
514 }
515 }
516
517 pub fn upsert_storage_history(
519 &mut self,
520 key: StorageShardedKey,
521 value: &BlockNumberList,
522 ) -> ProviderResult<()> {
523 match self {
524 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
525 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
526 Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
527 }
528 }
529
530 pub fn get_last_storage_history_shard(
532 &mut self,
533 address: Address,
534 storage_key: B256,
535 ) -> ProviderResult<Option<BlockNumberList>> {
536 let key = StorageShardedKey::last(address, storage_key);
537 match self {
538 Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
539 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
540 Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
541 }
542 }
543}
544
545impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
546where
547 CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
548{
549 pub fn append_account_history(
551 &mut self,
552 key: ShardedKey<Address>,
553 value: &BlockNumberList,
554 ) -> ProviderResult<()> {
555 match self {
556 Self::Database(cursor) => Ok(cursor.append(key, value)?),
557 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
558 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
559 }
560 }
561
562 pub fn upsert_account_history(
564 &mut self,
565 key: ShardedKey<Address>,
566 value: &BlockNumberList,
567 ) -> ProviderResult<()> {
568 match self {
569 Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
570 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
571 Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
572 }
573 }
574
575 pub fn get_last_account_history_shard(
577 &mut self,
578 address: Address,
579 ) -> ProviderResult<Option<BlockNumberList>> {
580 match self {
581 Self::Database(cursor) => {
582 Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
583 }
584 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
585 Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
586 }
587 }
588
589 pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
591 match self {
592 Self::Database(cursor) => {
593 if cursor.seek_exact(key)?.is_some() {
594 cursor.delete_current()?;
595 }
596 Ok(())
597 }
598 Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
599 Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
600 }
601 }
602}
603
604impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
605where
606 CURSOR: DbDupCursorRW<tables::AccountChangeSets>,
607{
608 pub fn append_account_changeset(
612 &mut self,
613 block_number: BlockNumber,
614 mut changeset: Vec<AccountBeforeTx>,
615 ) -> ProviderResult<()> {
616 changeset.par_sort_by_key(|a| a.address);
618 match self {
619 Self::Database(cursor) => {
620 for change in changeset {
621 cursor.append_dup(block_number, change)?;
622 }
623 }
624 Self::StaticFile(writer) => {
625 writer.append_account_changeset(changeset, block_number)?;
626 }
627 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
628 }
629
630 Ok(())
631 }
632}
633
634impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
635where
636 CURSOR: DbDupCursorRW<tables::StorageChangeSets>,
637{
638 pub fn append_storage_changeset(
642 &mut self,
643 block_number: BlockNumber,
644 mut changeset: Vec<StorageBeforeTx>,
645 ) -> ProviderResult<()> {
646 changeset.par_sort_by_key(|change| (change.address, change.key));
647
648 match self {
649 Self::Database(cursor) => {
650 for change in changeset {
651 let storage_id = BlockNumberAddress((block_number, change.address));
652 cursor.append_dup(
653 storage_id,
654 StorageEntry { key: change.key, value: change.value },
655 )?;
656 }
657 }
658 Self::StaticFile(writer) => {
659 writer.append_storage_changeset(changeset, block_number)?;
660 }
661 Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
662 }
663
664 Ok(())
665 }
666}
667
668#[derive(Debug, Display)]
670pub enum EitherReader<'a, CURSOR, N> {
671 Database(CURSOR, PhantomData<&'a ()>),
673 StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
675 RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>),
677}
678
679impl<'a> EitherReader<'a, (), ()> {
680 pub fn new_senders<P>(
682 provider: &P,
683 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
684 where
685 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
686 P::Tx: DbTx,
687 {
688 if EitherWriterDestination::senders(provider).is_static_file() {
689 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
690 } else {
691 Ok(EitherReader::Database(
692 provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
693 PhantomData,
694 ))
695 }
696 }
697
698 pub fn new_storages_history<P>(
700 provider: &P,
701 rocksdb: RocksDBRefArg<'a>,
702 ) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
703 where
704 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
705 P::Tx: DbTx,
706 {
707 if provider.cached_storage_settings().storage_v2 {
708 return Ok(EitherReader::RocksDB(
709 rocksdb.expect("storages_history_in_rocksdb requires rocksdb snapshot"),
710 ));
711 }
712
713 Ok(EitherReader::Database(
714 provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
715 PhantomData,
716 ))
717 }
718
719 pub fn new_transaction_hash_numbers<P>(
721 provider: &P,
722 rocksdb: RocksDBRefArg<'a>,
723 ) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
724 where
725 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
726 P::Tx: DbTx,
727 {
728 if provider.cached_storage_settings().storage_v2 {
729 return Ok(EitherReader::RocksDB(
730 rocksdb.expect("transaction_hash_numbers_in_rocksdb requires rocksdb snapshot"),
731 ));
732 }
733
734 Ok(EitherReader::Database(
735 provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
736 PhantomData,
737 ))
738 }
739
740 pub fn new_accounts_history<P>(
742 provider: &P,
743 rocksdb: RocksDBRefArg<'a>,
744 ) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
745 where
746 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
747 P::Tx: DbTx,
748 {
749 if provider.cached_storage_settings().storage_v2 {
750 return Ok(EitherReader::RocksDB(
751 rocksdb.expect("account_history_in_rocksdb requires rocksdb snapshot"),
752 ));
753 }
754
755 Ok(EitherReader::Database(
756 provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
757 PhantomData,
758 ))
759 }
760
761 pub fn new_account_changesets<P>(
763 provider: &P,
764 ) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
765 where
766 P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
767 P::Tx: DbTx,
768 {
769 if EitherWriterDestination::account_changesets(provider).is_static_file() {
770 Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
771 } else {
772 Ok(EitherReader::Database(
773 provider.tx_ref().cursor_dup_read::<tables::AccountChangeSets>()?,
774 PhantomData,
775 ))
776 }
777 }
778}
779
780impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
781where
782 CURSOR: DbCursorRO<tables::TransactionSenders>,
783{
784 pub fn senders_by_tx_range(
786 &mut self,
787 range: Range<TxNumber>,
788 ) -> ProviderResult<HashMap<TxNumber, Address>> {
789 match self {
790 Self::Database(cursor, _) => cursor
791 .walk_range(range)?
792 .map(|result| result.map_err(ProviderError::from))
793 .collect::<ProviderResult<HashMap<_, _>>>(),
794 Self::StaticFile(provider, _) => range
795 .clone()
796 .zip(provider.fetch_range_iter(
797 StaticFileSegment::TransactionSenders,
798 range,
799 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
800 )?)
801 .filter_map(|(tx_num, sender)| {
802 let result = sender.transpose()?;
803 Some(result.map(|sender| (tx_num, sender)))
804 })
805 .collect::<ProviderResult<HashMap<_, _>>>(),
806 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
807 }
808 }
809}
810
811impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
812where
813 CURSOR: DbCursorRO<tables::TransactionHashNumbers>,
814{
815 pub fn get_transaction_hash_number(
817 &mut self,
818 hash: TxHash,
819 ) -> ProviderResult<Option<TxNumber>> {
820 match self {
821 Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
822 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
823 Self::RocksDB(snapshot) => snapshot.get::<tables::TransactionHashNumbers>(hash),
824 }
825 }
826}
827
828impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
829where
830 CURSOR: DbCursorRO<tables::StoragesHistory>,
831{
832 pub fn get_storage_history(
834 &mut self,
835 key: StorageShardedKey,
836 ) -> ProviderResult<Option<BlockNumberList>> {
837 match self {
838 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
839 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
840 Self::RocksDB(snapshot) => snapshot.get::<tables::StoragesHistory>(key),
841 }
842 }
843
844 pub fn storage_history_info(
846 &mut self,
847 address: Address,
848 storage_key: alloy_primitives::B256,
849 block_number: BlockNumber,
850 lowest_available_block_number: Option<BlockNumber>,
851 visible_tip: BlockNumber,
852 ) -> ProviderResult<HistoryInfo> {
853 match self {
854 Self::Database(cursor, _) => {
855 let key = StorageShardedKey::new(address, storage_key, block_number);
856 history_info::<tables::StoragesHistory, _, _>(
857 cursor,
858 key,
859 block_number,
860 |k| k.address == address && k.sharded_key.key == storage_key,
861 lowest_available_block_number,
862 )
863 }
864 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
865 Self::RocksDB(snapshot) => snapshot.storage_history_info(
866 address,
867 storage_key,
868 block_number,
869 lowest_available_block_number,
870 visible_tip,
871 ),
872 }
873 }
874}
875
876impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
877where
878 CURSOR: DbCursorRO<tables::AccountsHistory>,
879{
880 pub fn get_account_history(
882 &mut self,
883 key: ShardedKey<Address>,
884 ) -> ProviderResult<Option<BlockNumberList>> {
885 match self {
886 Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
887 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
888 Self::RocksDB(snapshot) => snapshot.get::<tables::AccountsHistory>(key),
889 }
890 }
891
892 pub fn account_history_info(
894 &mut self,
895 address: Address,
896 block_number: BlockNumber,
897 lowest_available_block_number: Option<BlockNumber>,
898 visible_tip: BlockNumber,
899 ) -> ProviderResult<HistoryInfo> {
900 match self {
901 Self::Database(cursor, _) => {
902 let key = ShardedKey::new(address, block_number);
903 history_info::<tables::AccountsHistory, _, _>(
904 cursor,
905 key,
906 block_number,
907 |k| k.key == address,
908 lowest_available_block_number,
909 )
910 }
911 Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
912 Self::RocksDB(snapshot) => snapshot.account_history_info(
913 address,
914 block_number,
915 lowest_available_block_number,
916 visible_tip,
917 ),
918 }
919 }
920}
921
922impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
923where
924 CURSOR: DbCursorRO<tables::AccountChangeSets>,
925{
926 pub fn changed_accounts_with_range(
928 &mut self,
929 range: RangeInclusive<BlockNumber>,
930 ) -> ProviderResult<BTreeSet<Address>> {
931 match self {
932 Self::StaticFile(provider, _) => {
933 let highest_static_block =
934 provider.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
935
936 let Some(highest) = highest_static_block else {
937 return Err(ProviderError::MissingHighestStaticFileBlock(
938 StaticFileSegment::AccountChangeSets,
939 ))
940 };
941
942 let start = *range.start();
943 let static_end = (*range.end()).min(highest);
944
945 let mut changed_accounts = BTreeSet::default();
946 if start <= static_end {
947 for block in start..=static_end {
948 let block_changesets = provider.account_block_changeset(block)?;
949 for changeset in block_changesets {
950 changed_accounts.insert(changeset.address);
951 }
952 }
953 }
954
955 Ok(changed_accounts)
956 }
957 Self::Database(provider, _) => provider
958 .walk_range(range)?
959 .map(|entry| {
960 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
961 })
962 .collect(),
963 Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
964 }
965 }
966}
967
968#[derive(Debug, EnumIs)]
970pub enum EitherWriterDestination {
971 Database,
973 StaticFile,
975 RocksDB,
977}
978
979impl EitherWriterDestination {
980 pub fn senders<P>(provider: &P) -> Self
982 where
983 P: StorageSettingsCache,
984 {
985 if provider.cached_storage_settings().storage_v2 {
987 Self::StaticFile
988 } else {
989 Self::Database
990 }
991 }
992
993 pub fn account_changesets<P>(provider: &P) -> Self
995 where
996 P: StorageSettingsCache,
997 {
998 if provider.cached_storage_settings().storage_v2 {
1000 Self::StaticFile
1001 } else {
1002 Self::Database
1003 }
1004 }
1005
1006 pub fn storage_changesets<P>(provider: &P) -> Self
1008 where
1009 P: StorageSettingsCache,
1010 {
1011 if provider.cached_storage_settings().storage_v2 {
1013 Self::StaticFile
1014 } else {
1015 Self::Database
1016 }
1017 }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use crate::{test_utils::create_test_provider_factory, StaticFileWriter};
1023
1024 use super::*;
1025 use alloy_primitives::Address;
1026 use reth_db::models::AccountBeforeTx;
1027 use reth_static_file_types::StaticFileSegment;
1028 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1029
1030 #[test]
1042 fn test_changed_accounts_with_range_caps_at_static_file_tip() {
1043 let factory = create_test_provider_factory();
1044 let highest_block = 5u64;
1045
1046 let addresses: Vec<Address> = (0..=highest_block)
1047 .map(|i| {
1048 let mut addr = Address::ZERO;
1049 addr.0[0] = i as u8;
1050 addr
1051 })
1052 .collect();
1053
1054 {
1055 let sf_provider = factory.static_file_provider();
1056 let mut writer =
1057 sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
1058
1059 for block_num in 0..=highest_block {
1060 let changeset =
1061 vec![AccountBeforeTx { address: addresses[block_num as usize], info: None }];
1062 writer.append_account_changeset(changeset, block_num).unwrap();
1063 }
1064 writer.commit().unwrap();
1065 }
1066
1067 factory.set_storage_settings_cache(StorageSettings::v2());
1068
1069 let provider = factory.database_provider_ro().unwrap();
1070
1071 let sf_tip = provider
1072 .static_file_provider()
1073 .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1074 assert_eq!(sf_tip, Some(highest_block));
1075
1076 let mut reader = EitherReader::new_account_changesets(&provider).unwrap();
1077 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1078
1079 let result = reader.changed_accounts_with_range(0..=10).unwrap();
1081
1082 let expected: BTreeSet<Address> = addresses.into_iter().collect();
1083 assert_eq!(result, expected);
1084 }
1085
1086 #[test]
1087 fn test_reader_senders_by_tx_range() {
1088 let factory = create_test_provider_factory();
1089
1090 let senders = [
1092 (1, Address::random()),
1093 (2, Address::random()),
1094 (3, Address::random()),
1095 (4, Address::random()),
1096 ];
1097
1098 for transaction_senders_in_static_files in [false, true] {
1099 factory.set_storage_settings_cache(if transaction_senders_in_static_files {
1100 StorageSettings::v2()
1101 } else {
1102 StorageSettings::v1()
1103 });
1104
1105 let provider = factory.database_provider_rw().unwrap();
1106 let mut writer = EitherWriter::new_senders(&provider, 0).unwrap();
1107 if transaction_senders_in_static_files {
1108 assert!(matches!(writer, EitherWriter::StaticFile(_)));
1109 } else {
1110 assert!(matches!(writer, EitherWriter::Database(_)));
1111 }
1112
1113 writer.increment_block(0).unwrap();
1114 writer.append_senders(senders.iter().copied()).unwrap();
1115 drop(writer);
1116 provider.commit().unwrap();
1117
1118 let provider = factory.database_provider_ro().unwrap();
1119 let mut reader = EitherReader::new_senders(&provider).unwrap();
1120 if transaction_senders_in_static_files {
1121 assert!(matches!(reader, EitherReader::StaticFile(_, _)));
1122 } else {
1123 assert!(matches!(reader, EitherReader::Database(_, _)));
1124 }
1125
1126 assert_eq!(
1127 reader.senders_by_tx_range(0..6).unwrap(),
1128 senders.iter().copied().collect::<HashMap<_, _>>(),
1129 "{reader}"
1130 );
1131 }
1132 }
1133}
1134
1135#[cfg(test)]
1136mod rocksdb_tests {
1137 use super::*;
1138 use crate::{
1139 providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
1140 test_utils::create_test_provider_factory,
1141 RocksDBProviderFactory,
1142 };
1143 use alloy_primitives::{Address, B256};
1144 use reth_db_api::{
1145 models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
1146 tables,
1147 transaction::DbTxMut,
1148 };
1149 use reth_ethereum_primitives::EthPrimitives;
1150 use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
1151 use std::marker::PhantomData;
1152 use tempfile::TempDir;
1153
1154 fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
1155 let temp_dir = TempDir::new().unwrap();
1156 let provider = RocksDBBuilder::new(temp_dir.path())
1157 .with_table::<tables::TransactionHashNumbers>()
1158 .with_table::<tables::StoragesHistory>()
1159 .with_table::<tables::AccountsHistory>()
1160 .build()
1161 .unwrap();
1162 (temp_dir, provider)
1163 }
1164
1165 #[test]
1169 fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
1170 let factory = create_test_provider_factory();
1171
1172 factory.set_storage_settings_cache(StorageSettings::v2());
1174
1175 let hash1 = B256::from([1u8; 32]);
1176 let hash2 = B256::from([2u8; 32]);
1177 let tx_num1 = 100u64;
1178 let tx_num2 = 200u64;
1179
1180 let rocksdb = factory.rocksdb_provider();
1182 let batch = rocksdb.batch();
1183
1184 let provider = factory.database_provider_rw().unwrap();
1186 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1187
1188 assert!(matches!(writer, EitherWriter::RocksDB(_)));
1190
1191 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1193 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1194
1195 if let Some(batch) = writer.into_raw_rocksdb_batch() {
1197 provider.set_pending_rocksdb_batch(batch);
1198 }
1199
1200 provider.commit().unwrap();
1202
1203 let rocksdb = factory.rocksdb_provider();
1205 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1206 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1207 }
1208
1209 #[test]
1211 fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
1212 let factory = create_test_provider_factory();
1213
1214 factory.set_storage_settings_cache(StorageSettings::v2());
1216
1217 let hash = B256::from([1u8; 32]);
1218 let tx_num = 100u64;
1219
1220 let rocksdb = factory.rocksdb_provider();
1222 rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1223 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1224
1225 let batch = rocksdb.batch();
1227 let provider = factory.database_provider_rw().unwrap();
1228 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1229 writer.delete_transaction_hash_number(hash).unwrap();
1230
1231 if let Some(batch) = writer.into_raw_rocksdb_batch() {
1233 provider.set_pending_rocksdb_batch(batch);
1234 }
1235 provider.commit().unwrap();
1236
1237 let rocksdb = factory.rocksdb_provider();
1239 assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1240 }
1241
1242 #[test]
1243 fn test_rocksdb_batch_transaction_hash_numbers() {
1244 let (_temp_dir, provider) = create_rocksdb_provider();
1245
1246 let hash1 = B256::from([1u8; 32]);
1247 let hash2 = B256::from([2u8; 32]);
1248 let tx_num1 = 100u64;
1249 let tx_num2 = 200u64;
1250
1251 let mut batch = provider.batch();
1253 batch.put::<tables::TransactionHashNumbers>(hash1, &tx_num1).unwrap();
1254 batch.put::<tables::TransactionHashNumbers>(hash2, &tx_num2).unwrap();
1255 batch.commit().unwrap();
1256
1257 let tx = provider.tx();
1259 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
1260 assert_eq!(tx.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
1261
1262 let missing_hash = B256::from([99u8; 32]);
1264 assert_eq!(tx.get::<tables::TransactionHashNumbers>(missing_hash).unwrap(), None);
1265 }
1266
1267 #[test]
1268 fn test_rocksdb_batch_storage_history() {
1269 let (_temp_dir, provider) = create_rocksdb_provider();
1270
1271 let address = Address::random();
1272 let storage_key = B256::from([1u8; 32]);
1273 let key = StorageShardedKey::new(address, storage_key, 1000);
1274 let value = IntegerList::new([1, 5, 10, 50]).unwrap();
1275
1276 let mut batch = provider.batch();
1278 batch.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1279 batch.commit().unwrap();
1280
1281 let tx = provider.tx();
1283 let result = tx.get::<tables::StoragesHistory>(key).unwrap();
1284 assert_eq!(result, Some(value));
1285
1286 let missing_key = StorageShardedKey::new(Address::random(), B256::ZERO, 0);
1288 assert_eq!(tx.get::<tables::StoragesHistory>(missing_key).unwrap(), None);
1289 }
1290
1291 #[test]
1292 fn test_rocksdb_batch_account_history() {
1293 let (_temp_dir, provider) = create_rocksdb_provider();
1294
1295 let address = Address::random();
1296 let key = ShardedKey::new(address, 1000);
1297 let value = IntegerList::new([1, 10, 100, 500]).unwrap();
1298
1299 let mut batch = provider.batch();
1301 batch.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1302 batch.commit().unwrap();
1303
1304 let tx = provider.tx();
1306 let result = tx.get::<tables::AccountsHistory>(key).unwrap();
1307 assert_eq!(result, Some(value));
1308
1309 let missing_key = ShardedKey::new(Address::random(), 0);
1311 assert_eq!(tx.get::<tables::AccountsHistory>(missing_key).unwrap(), None);
1312 }
1313
1314 #[test]
1315 fn test_rocksdb_batch_delete_transaction_hash_number() {
1316 let (_temp_dir, provider) = create_rocksdb_provider();
1317
1318 let hash = B256::from([1u8; 32]);
1319 let tx_num = 100u64;
1320
1321 provider.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
1323 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
1324
1325 let mut batch = provider.batch();
1327 batch.delete::<tables::TransactionHashNumbers>(hash).unwrap();
1328 batch.commit().unwrap();
1329
1330 assert_eq!(provider.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
1332 }
1333
1334 #[test]
1335 fn test_rocksdb_batch_delete_storage_history() {
1336 let (_temp_dir, provider) = create_rocksdb_provider();
1337
1338 let address = Address::random();
1339 let storage_key = B256::from([1u8; 32]);
1340 let key = StorageShardedKey::new(address, storage_key, 1000);
1341 let value = IntegerList::new([1, 5, 10]).unwrap();
1342
1343 provider.put::<tables::StoragesHistory>(key.clone(), &value).unwrap();
1345 assert!(provider.get::<tables::StoragesHistory>(key.clone()).unwrap().is_some());
1346
1347 let mut batch = provider.batch();
1349 batch.delete::<tables::StoragesHistory>(key.clone()).unwrap();
1350 batch.commit().unwrap();
1351
1352 assert_eq!(provider.get::<tables::StoragesHistory>(key).unwrap(), None);
1354 }
1355
1356 #[test]
1357 fn test_rocksdb_batch_delete_account_history() {
1358 let (_temp_dir, provider) = create_rocksdb_provider();
1359
1360 let address = Address::random();
1361 let key = ShardedKey::new(address, 1000);
1362 let value = IntegerList::new([1, 10, 100]).unwrap();
1363
1364 provider.put::<tables::AccountsHistory>(key.clone(), &value).unwrap();
1366 assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
1367
1368 let mut batch = provider.batch();
1370 batch.delete::<tables::AccountsHistory>(key.clone()).unwrap();
1371 batch.commit().unwrap();
1372
1373 assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
1375 }
1376
1377 struct HistoryQuery {
1384 block_number: BlockNumber,
1385 lowest_available: Option<BlockNumber>,
1386 expected: HistoryInfo,
1387 }
1388
1389 type AccountsHistoryWriteCursor =
1391 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
1392 type StoragesHistoryWriteCursor =
1393 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
1394 type AccountsHistoryReadCursor =
1395 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
1396 type StoragesHistoryReadCursor =
1397 reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
1398
1399 fn run_account_history_scenario(
1402 scenario_name: &str,
1403 address: Address,
1404 shards: &[(BlockNumber, Vec<BlockNumber>)], queries: &[HistoryQuery],
1406 ) {
1407 let factory = create_test_provider_factory();
1409 let mdbx_provider = factory.database_provider_rw().unwrap();
1410 let (temp_dir, rocks_provider) = create_rocksdb_provider();
1411
1412 let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1414 EitherWriter::Database(
1415 mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
1416 );
1417 let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
1418 EitherWriter::RocksDB(rocks_provider.batch());
1419
1420 for (highest_block, blocks) in shards {
1422 let key = ShardedKey::new(address, *highest_block);
1423 let value = IntegerList::new(blocks.clone()).unwrap();
1424 mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
1425 rocks_writer.upsert_account_history(key, &value).unwrap();
1426 }
1427
1428 drop(mdbx_writer);
1430 mdbx_provider.commit().unwrap();
1431 if let EitherWriter::RocksDB(batch) = rocks_writer {
1432 batch.commit().unwrap();
1433 }
1434
1435 let mdbx_ro = factory.database_provider_ro().unwrap();
1437 let rocks_snapshot = rocks_provider.snapshot();
1438
1439 for (i, query) in queries.iter().enumerate() {
1440 let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
1442 EitherReader::Database(
1443 mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
1444 PhantomData,
1445 );
1446 let mdbx_result = mdbx_reader
1447 .account_history_info(address, query.block_number, query.lowest_available, u64::MAX)
1448 .unwrap();
1449
1450 let rocks_result = rocks_snapshot
1452 .account_history_info(address, query.block_number, query.lowest_available, u64::MAX)
1453 .unwrap();
1454
1455 assert_eq!(
1457 mdbx_result,
1458 rocks_result,
1459 "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1460 MDBX: {:?}, RocksDB: {:?}",
1461 scenario_name,
1462 i,
1463 query.block_number,
1464 query.lowest_available,
1465 mdbx_result,
1466 rocks_result
1467 );
1468
1469 assert_eq!(
1471 mdbx_result,
1472 query.expected,
1473 "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1474 Got: {:?}, Expected: {:?}",
1475 scenario_name,
1476 i,
1477 query.block_number,
1478 query.lowest_available,
1479 mdbx_result,
1480 query.expected
1481 );
1482 }
1483
1484 drop(temp_dir);
1485 }
1486
1487 fn run_storage_history_scenario(
1490 scenario_name: &str,
1491 address: Address,
1492 storage_key: B256,
1493 shards: &[(BlockNumber, Vec<BlockNumber>)], queries: &[HistoryQuery],
1495 ) {
1496 let factory = create_test_provider_factory();
1498 let mdbx_provider = factory.database_provider_rw().unwrap();
1499 let (temp_dir, rocks_provider) = create_rocksdb_provider();
1500
1501 let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1503 EitherWriter::Database(
1504 mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
1505 );
1506 let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
1507 EitherWriter::RocksDB(rocks_provider.batch());
1508
1509 for (highest_block, blocks) in shards {
1511 let key = StorageShardedKey::new(address, storage_key, *highest_block);
1512 let value = IntegerList::new(blocks.clone()).unwrap();
1513 mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
1514 rocks_writer.put_storage_history(key, &value).unwrap();
1515 }
1516
1517 drop(mdbx_writer);
1519 mdbx_provider.commit().unwrap();
1520 if let EitherWriter::RocksDB(batch) = rocks_writer {
1521 batch.commit().unwrap();
1522 }
1523
1524 let mdbx_ro = factory.database_provider_ro().unwrap();
1526 let rocks_snapshot = rocks_provider.snapshot();
1527
1528 for (i, query) in queries.iter().enumerate() {
1529 let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
1531 EitherReader::Database(
1532 mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
1533 PhantomData,
1534 );
1535 let mdbx_result = mdbx_reader
1536 .storage_history_info(
1537 address,
1538 storage_key,
1539 query.block_number,
1540 query.lowest_available,
1541 u64::MAX,
1542 )
1543 .unwrap();
1544
1545 let rocks_result = rocks_snapshot
1547 .storage_history_info(
1548 address,
1549 storage_key,
1550 query.block_number,
1551 query.lowest_available,
1552 u64::MAX,
1553 )
1554 .unwrap();
1555
1556 assert_eq!(
1558 mdbx_result,
1559 rocks_result,
1560 "Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
1561 MDBX: {:?}, RocksDB: {:?}",
1562 scenario_name,
1563 i,
1564 query.block_number,
1565 query.lowest_available,
1566 mdbx_result,
1567 rocks_result
1568 );
1569
1570 assert_eq!(
1572 mdbx_result,
1573 query.expected,
1574 "Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
1575 Got: {:?}, Expected: {:?}",
1576 scenario_name,
1577 i,
1578 query.block_number,
1579 query.lowest_available,
1580 mdbx_result,
1581 query.expected
1582 );
1583 }
1584
1585 drop(temp_dir);
1586 }
1587
1588 #[test]
1596 fn test_account_history_info_both_backends() {
1597 let address = Address::from([0x42; 20]);
1598
1599 run_account_history_scenario(
1601 "single_shard",
1602 address,
1603 &[(u64::MAX, vec![100, 200, 300])],
1604 &[
1605 HistoryQuery {
1607 block_number: 50,
1608 lowest_available: None,
1609 expected: HistoryInfo::NotYetWritten,
1610 },
1611 HistoryQuery {
1613 block_number: 150,
1614 lowest_available: None,
1615 expected: HistoryInfo::InChangeset(200),
1616 },
1617 HistoryQuery {
1619 block_number: 300,
1620 lowest_available: None,
1621 expected: HistoryInfo::InChangeset(300),
1622 },
1623 HistoryQuery {
1625 block_number: 500,
1626 lowest_available: None,
1627 expected: HistoryInfo::InPlainState,
1628 },
1629 ],
1630 );
1631
1632 run_account_history_scenario(
1634 "multiple_shards",
1635 address,
1636 &[
1637 (500, vec![100, 200, 300, 400, 500]), (u64::MAX, vec![600, 700, 800]), ],
1640 &[
1641 HistoryQuery {
1643 block_number: 50,
1644 lowest_available: None,
1645 expected: HistoryInfo::NotYetWritten,
1646 },
1647 HistoryQuery {
1649 block_number: 150,
1650 lowest_available: None,
1651 expected: HistoryInfo::InChangeset(200),
1652 },
1653 HistoryQuery {
1655 block_number: 550,
1656 lowest_available: None,
1657 expected: HistoryInfo::InChangeset(600),
1658 },
1659 HistoryQuery {
1661 block_number: 900,
1662 lowest_available: None,
1663 expected: HistoryInfo::InPlainState,
1664 },
1665 ],
1666 );
1667
1668 let address_without_history = Address::from([0x43; 20]);
1670 run_account_history_scenario(
1671 "no_history",
1672 address_without_history,
1673 &[], &[HistoryQuery {
1675 block_number: 150,
1676 lowest_available: None,
1677 expected: HistoryInfo::NotYetWritten,
1678 }],
1679 );
1680
1681 run_account_history_scenario(
1687 "with_pruning_boundary",
1688 address,
1689 &[(u64::MAX, vec![100, 200, 300])],
1690 &[
1691 HistoryQuery {
1693 block_number: 100,
1694 lowest_available: Some(100),
1695 expected: HistoryInfo::InChangeset(100),
1696 },
1697 HistoryQuery {
1699 block_number: 150,
1700 lowest_available: Some(100),
1701 expected: HistoryInfo::InChangeset(200),
1702 },
1703 ],
1704 );
1705 }
1706
1707 #[test]
1709 fn test_storage_history_info_both_backends() {
1710 let address = Address::from([0x42; 20]);
1711 let storage_key = B256::from([0x01; 32]);
1712 let other_storage_key = B256::from([0x02; 32]);
1713
1714 run_storage_history_scenario(
1716 "storage_single_shard",
1717 address,
1718 storage_key,
1719 &[(u64::MAX, vec![100, 200, 300])],
1720 &[
1721 HistoryQuery {
1723 block_number: 50,
1724 lowest_available: None,
1725 expected: HistoryInfo::NotYetWritten,
1726 },
1727 HistoryQuery {
1729 block_number: 150,
1730 lowest_available: None,
1731 expected: HistoryInfo::InChangeset(200),
1732 },
1733 HistoryQuery {
1735 block_number: 500,
1736 lowest_available: None,
1737 expected: HistoryInfo::InPlainState,
1738 },
1739 ],
1740 );
1741
1742 run_storage_history_scenario(
1744 "storage_no_history",
1745 address,
1746 other_storage_key,
1747 &[], &[HistoryQuery {
1749 block_number: 150,
1750 lowest_available: None,
1751 expected: HistoryInfo::NotYetWritten,
1752 }],
1753 );
1754 }
1755
1756 #[test]
1759 fn test_rocksdb_commits_at_provider_level() {
1760 let factory = create_test_provider_factory();
1761
1762 factory.set_storage_settings_cache(StorageSettings::v2());
1764
1765 let hash1 = B256::from([1u8; 32]);
1766 let hash2 = B256::from([2u8; 32]);
1767 let tx_num1 = 100u64;
1768 let tx_num2 = 200u64;
1769
1770 let rocksdb = factory.rocksdb_provider();
1772 let batch = rocksdb.batch();
1773
1774 let provider = factory.database_provider_rw().unwrap();
1776 let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
1777
1778 writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
1780 writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
1781
1782 let raw_batch = writer.into_raw_rocksdb_batch();
1784 if let Some(batch) = raw_batch {
1785 provider.set_pending_rocksdb_batch(batch);
1786 }
1787
1788 let rocksdb = factory.rocksdb_provider();
1790 assert_eq!(
1791 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1792 None,
1793 "Data should not be visible before provider.commit()"
1794 );
1795
1796 provider.commit().unwrap();
1798
1799 let rocksdb = factory.rocksdb_provider();
1801 assert_eq!(
1802 rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
1803 Some(tx_num1),
1804 "Data should be visible after provider.commit()"
1805 );
1806 assert_eq!(
1807 rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
1808 Some(tx_num2),
1809 "Data should be visible after provider.commit()"
1810 );
1811 }
1812
1813 #[test]
1817 #[should_panic(expected = "account_history_in_rocksdb requires rocksdb snapshot")]
1818 fn test_settings_mismatch_panics() {
1819 let factory = create_test_provider_factory();
1820
1821 factory.set_storage_settings_cache(StorageSettings::v2());
1822
1823 let provider = factory.database_provider_ro().unwrap();
1824 let _ = EitherReader::<(), ()>::new_accounts_history(&provider, None);
1825 }
1826}