1use crate::{
2 providers::{
3 state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4 StaticFileProvider, StaticFileProviderRWRefMut,
5 },
6 to_range,
7 traits::{BlockSource, ReceiptProvider},
8 BalProvider, BalStoreHandle, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider,
9 DatabaseProviderFactory, EitherWriterDestination, HashedPostStateProvider, HeaderProvider,
10 HeaderSyncGapProvider, InMemoryBalStore, MetadataProvider, ProviderError,
11 PruneCheckpointReader, RocksDBProviderFactory, StageCheckpointReader, StateProviderBox,
12 StaticFileProviderFactory, StaticFileWriter, TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use notify::{RecommendedWatcher, RecursiveMode, Watcher};
19use parking_lot::RwLock;
20use reth_chainspec::ChainInfo;
21use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
22use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
23use reth_errors::{RethError, RethResult};
24use reth_node_types::{
25 BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
26};
27use reth_primitives_traits::{RecoveredBlock, SealedHeader};
28use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
29use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
30use reth_static_file_types::StaticFileSegment;
31use reth_storage_api::{
32 BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
33 NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
34};
35use reth_storage_errors::provider::ProviderResult;
36use reth_trie::HashedPostState;
37use reth_trie_db::ChangesetCache;
38use revm_database::BundleState;
39use std::{
40 ops::{RangeBounds, RangeInclusive},
41 path::Path,
42 sync::{
43 atomic::{AtomicU64, Ordering},
44 Arc, Mutex,
45 },
46};
47use tracing::{info, instrument, trace, warn};
48
49mod provider;
50pub use provider::{
51 CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
52};
53
54use super::ProviderNodeTypes;
55use reth_trie::KeccakKeyHasher;
56
57mod builder;
58pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
59
60mod metrics;
61
62mod chain;
63pub use chain::*;
64
65struct ReadOnlySyncState {
67 last_synced_txnid: AtomicU64,
69 sync_lock: Mutex<()>,
71}
72
73pub struct ProviderFactory<N: NodeTypesWithDB> {
77 db: N::DB,
79 chain_spec: Arc<N::ChainSpec>,
81 static_file_provider: StaticFileProvider<N::Primitives>,
83 prune_modes: PruneModes,
85 storage: Arc<N::Storage>,
87 storage_settings: Arc<RwLock<StorageSettings>>,
89 rocksdb_provider: RocksDBProvider,
91 changeset_cache: ChangesetCache,
93 bal_store: BalStoreHandle,
95 runtime: reth_tasks::Runtime,
97 minimum_pruning_distance: u64,
99 read_only_sync: Option<Arc<ReadOnlySyncState>>,
104}
105
106impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
107 pub fn builder() -> ProviderFactoryBuilder<N> {
109 ProviderFactoryBuilder::default()
110 }
111}
112
113impl<N: ProviderNodeTypes> ProviderFactory<N> {
114 pub fn new(
122 db: N::DB,
123 chain_spec: Arc<N::ChainSpec>,
124 static_file_provider: StaticFileProvider<N::Primitives>,
125 rocksdb_provider: RocksDBProvider,
126 runtime: reth_tasks::Runtime,
127 ) -> ProviderResult<Self> {
128 let legacy_settings = StorageSettings::v1();
133 let storage_settings = DatabaseProvider::<_, N>::new(
134 db.tx()?,
135 chain_spec.clone(),
136 static_file_provider.clone(),
137 Default::default(),
138 Default::default(),
139 Arc::new(RwLock::new(legacy_settings)),
140 rocksdb_provider.clone(),
141 ChangesetCache::new(),
142 runtime.clone(),
143 db.path(),
144 )
145 .storage_settings()?
146 .unwrap_or(legacy_settings);
147
148 Ok(Self {
149 db,
150 chain_spec,
151 static_file_provider,
152 prune_modes: PruneModes::default(),
153 storage: Default::default(),
154 storage_settings: Arc::new(RwLock::new(storage_settings)),
155 rocksdb_provider,
156 changeset_cache: ChangesetCache::new(),
157 bal_store: BalStoreHandle::new(InMemoryBalStore::default()),
158 runtime,
159 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
160 read_only_sync: None,
161 })
162 }
163
164 pub fn new_checked(
171 db: N::DB,
172 chain_spec: Arc<N::ChainSpec>,
173 static_file_provider: StaticFileProvider<N::Primitives>,
174 rocksdb_provider: RocksDBProvider,
175 runtime: reth_tasks::Runtime,
176 ) -> ProviderResult<Self> {
177 Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
178 .and_then(Self::assert_consistent)
179 }
180}
181
182impl<N: NodeTypesWithDB> ProviderFactory<N> {
183 pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
185 self.prune_modes = prune_modes;
186 self
187 }
188
189 pub fn with_bal_store(mut self, bal_store: BalStoreHandle) -> Self {
191 self.bal_store = bal_store;
192 self
193 }
194
195 pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
197 self.changeset_cache = changeset_cache;
198 self
199 }
200
201 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
206 self.minimum_pruning_distance = distance;
207 self
208 }
209
210 pub fn with_read_only_sync(mut self, watch: bool) -> Self
216 where
217 N::DB: Database,
218 {
219 let state = Arc::new(ReadOnlySyncState {
223 last_synced_txnid: AtomicU64::new(0),
224 sync_lock: Mutex::new(()),
225 });
226 self.read_only_sync = Some(state);
227
228 if watch {
229 self.watch_db_directory();
230 }
231 self
232 }
233
234 fn watch_db_directory(&self)
237 where
238 N::DB: Database,
239 {
240 let factory = self.clone();
241 let db_path = self.db.path();
242 reth_tasks::spawn_os_thread("ro-sync", move || {
243 let (tx, rx) = std::sync::mpsc::channel();
244 let mut watcher = RecommendedWatcher::new(
245 move |res| {
246 let _ = tx.send(res);
247 },
248 notify::Config::default(),
249 )
250 .expect("failed to create watcher");
251
252 watcher
253 .watch(&db_path, RecursiveMode::NonRecursive)
254 .expect("failed to watch MDBX path");
255
256 while let Ok(res) = rx.recv() {
257 match res {
258 Ok(event) => {
259 if !matches!(
260 event.kind,
261 notify::EventKind::Modify(_) | notify::EventKind::Create(_)
262 ) {
263 continue;
264 }
265
266 if let Err(err) = factory.sync_providers_if_needed() {
267 warn!(target: "reth::provider", %err, "background ro-sync failed");
268 }
269 }
270 Err(err) => {
271 warn!(target: "reth::provider", ?err, "MDBX directory watcher error");
272 }
273 }
274 }
275 });
276 }
277
278 pub fn sync_providers_if_needed(&self) -> ProviderResult<()> {
284 let Some(sync_state) = &self.read_only_sync else { return Ok(()) };
285 let current_txnid = self.db.last_txnid().unwrap_or(0);
286
287 if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
289 return Ok(());
290 }
291
292 let _guard = sync_state.sync_lock.lock().unwrap_or_else(|e| e.into_inner());
294
295 if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
297 return Ok(());
298 }
299
300 self.rocksdb_provider.try_catch_up_with_primary()?;
301 self.static_file_provider.initialize_index()?;
302 sync_state.last_synced_txnid.store(current_txnid, Ordering::Relaxed);
303 Ok(())
304 }
305
306 pub const fn db_ref(&self) -> &N::DB {
308 &self.db
309 }
310
311 #[cfg(any(test, feature = "test-utils"))]
312 pub fn into_db(self) -> N::DB {
314 self.db
315 }
316}
317
318impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
319 fn cached_storage_settings(&self) -> StorageSettings {
320 *self.storage_settings.read()
321 }
322
323 fn set_storage_settings_cache(&self, settings: StorageSettings) {
324 *self.storage_settings.write() = settings;
325 }
326}
327
328impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
329 fn rocksdb_provider(&self) -> RocksDBProvider {
330 self.rocksdb_provider.clone()
331 }
332
333 fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
334 unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
335 }
336
337 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
338 unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
339 }
340}
341
342impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
343 pub fn new_with_database_path<P: AsRef<Path>>(
346 path: P,
347 chain_spec: Arc<N::ChainSpec>,
348 args: DatabaseArguments,
349 static_file_provider: StaticFileProvider<N::Primitives>,
350 rocksdb_provider: RocksDBProvider,
351 runtime: reth_tasks::Runtime,
352 ) -> RethResult<Self> {
353 Self::new(
354 init_db(path, args).map_err(RethError::msg)?,
355 chain_spec,
356 static_file_provider,
357 rocksdb_provider,
358 runtime,
359 )
360 .map_err(RethError::Provider)
361 }
362}
363
364impl<N: ProviderNodeTypes> ProviderFactory<N> {
365 #[track_caller]
372 pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
373 let db_tx = self.db.tx()?;
374
375 self.sync_providers_if_needed()?;
381
382 Ok(DatabaseProvider::new(
383 db_tx,
384 self.chain_spec.clone(),
385 self.static_file_provider.clone(),
386 self.prune_modes.clone(),
387 self.storage.clone(),
388 self.storage_settings.clone(),
389 self.rocksdb_provider.clone(),
390 self.changeset_cache.clone(),
391 self.runtime.clone(),
392 self.db.path(),
393 )
394 .with_minimum_pruning_distance(self.minimum_pruning_distance))
395 }
396
397 #[track_caller]
402 pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
403 Ok(DatabaseProviderRW(
404 DatabaseProvider::new_rw(
405 self.db.tx_mut()?,
406 self.chain_spec.clone(),
407 self.static_file_provider.clone(),
408 self.prune_modes.clone(),
409 self.storage.clone(),
410 self.storage_settings.clone(),
411 self.rocksdb_provider.clone(),
412 self.changeset_cache.clone(),
413 self.runtime.clone(),
414 self.db.path(),
415 )
416 .with_reader_txn_tracker(self.db.clone())
417 .with_minimum_pruning_distance(self.minimum_pruning_distance),
418 ))
419 }
420
421 #[track_caller]
428 pub fn unwind_provider_rw(
429 &self,
430 ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
431 Ok(DatabaseProvider::new_unwind_rw(
432 self.db.tx_mut()?,
433 self.chain_spec.clone(),
434 self.static_file_provider.clone(),
435 self.prune_modes.clone(),
436 self.storage.clone(),
437 self.storage_settings.clone(),
438 self.rocksdb_provider.clone(),
439 self.changeset_cache.clone(),
440 self.runtime.clone(),
441 self.db.path(),
442 )
443 .with_reader_txn_tracker(self.db.clone())
444 .with_minimum_pruning_distance(self.minimum_pruning_distance))
445 }
446
447 #[track_caller]
449 pub fn latest(&self) -> ProviderResult<StateProviderBox> {
450 trace!(target: "providers::db", "Returning latest state provider");
451 Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
452 }
453
454 pub fn history_by_block_number(
456 &self,
457 block_number: BlockNumber,
458 ) -> ProviderResult<StateProviderBox> {
459 let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
460 trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
461 Ok(state_provider)
462 }
463
464 pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
466 let provider = self.provider()?;
467
468 let block_number = provider
469 .block_number(block_hash)?
470 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
471
472 let state_provider = provider.try_into_history_at_block(block_number)?;
473 trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
474 Ok(state_provider)
475 }
476
477 pub fn assert_consistent(self) -> ProviderResult<Self> {
482 let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
483
484 let source = match (rocksdb_unwind, static_file_unwind) {
485 (None, None) => return Ok(self),
486 (Some(_), Some(_)) => "RocksDB and Static Files",
487 (Some(_), None) => "RocksDB",
488 (None, Some(_)) => "Static Files",
489 };
490
491 Err(ProviderError::MustUnwind {
492 data_source: source,
493 unwind_to: rocksdb_unwind
494 .into_iter()
495 .chain(static_file_unwind)
496 .min()
497 .expect("at least one unwind target must be Some"),
498 })
499 }
500
501 #[instrument(err, skip(self))]
505 pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
506 let provider_ro = self
507 .database_provider_ro()?
508 .disable_long_read_transaction_safety();
512
513 self.static_file_provider().check_file_consistency(&provider_ro)?;
515
516 let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
518
519 let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
521 |target| match target {
522 PipelineTarget::Unwind(block) => block,
523 PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
524 },
525 );
526
527 if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
532 self.heal_chain_state_block_numbers(&provider_ro)?;
533 }
534
535 Ok((rocksdb_unwind, static_file_unwind))
536 }
537
538 fn heal_chain_state_block_numbers(
541 &self,
542 provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
543 ) -> ProviderResult<()> {
544 let highest_header = self.last_block_number()?;
545
546 let finalized = provider_ro.last_finalized_block_number()?;
547 let safe = provider_ro.last_safe_block_number()?;
548
549 if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
550 {
551 return Ok(());
552 }
553
554 let provider_rw = self.provider_rw()?;
555
556 if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
557 info!(
558 target: "providers::db",
559 finalized,
560 highest_header,
561 "Healing finalized block number",
562 );
563 provider_rw.save_finalized_block_number(highest_header)?;
564 }
565
566 if let Some(safe) = safe.filter(|&s| s > highest_header) {
567 info!(
568 target: "providers::db",
569 safe,
570 highest_header,
571 "Healing safe block number",
572 );
573 provider_rw.save_safe_block_number(highest_header)?;
574 }
575
576 provider_rw.commit()?;
577
578 Ok(())
579 }
580
581 pub fn caught_up_static_file_provider(
584 &self,
585 ) -> ProviderResult<StaticFileProvider<N::Primitives>> {
586 self.sync_providers_if_needed()?;
587 Ok(self.static_file_provider.clone())
588 }
589}
590
591impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
592 type Primitives = N::Primitives;
593}
594
595impl<N: NodeTypesWithDB> BalProvider for ProviderFactory<N> {
596 fn bal_store(&self) -> &BalStoreHandle {
597 &self.bal_store
598 }
599}
600
601impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
602 type DB = N::DB;
603 type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
604 type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
605
606 fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
607 self.provider()
608 }
609
610 fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
611 self.provider_rw().map(|provider| provider.0)
612 }
613}
614
615impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
616 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
618 self.static_file_provider.clone()
619 }
620
621 fn get_static_file_writer(
622 &self,
623 block: BlockNumber,
624 segment: StaticFileSegment,
625 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
626 self.static_file_provider.get_writer(block, segment)
627 }
628}
629
630impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
631 type Header = HeaderTy<N>;
632 fn local_tip_header(
633 &self,
634 highest_uninterrupted_block: BlockNumber,
635 ) -> ProviderResult<SealedHeader<Self::Header>> {
636 self.provider()?.local_tip_header(highest_uninterrupted_block)
637 }
638}
639
640impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
641 type Header = HeaderTy<N>;
642
643 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
644 self.provider()?.header(block_hash)
645 }
646
647 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
648 self.caught_up_static_file_provider()?.header_by_number(num)
649 }
650
651 fn headers_range(
652 &self,
653 range: impl RangeBounds<BlockNumber>,
654 ) -> ProviderResult<Vec<Self::Header>> {
655 self.caught_up_static_file_provider()?.headers_range(range)
656 }
657
658 fn sealed_header(
659 &self,
660 number: BlockNumber,
661 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
662 self.caught_up_static_file_provider()?.sealed_header(number)
663 }
664
665 fn sealed_headers_range(
666 &self,
667 range: impl RangeBounds<BlockNumber>,
668 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
669 self.caught_up_static_file_provider()?.sealed_headers_range(range)
670 }
671
672 fn sealed_headers_while(
673 &self,
674 range: impl RangeBounds<BlockNumber>,
675 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
676 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
677 self.caught_up_static_file_provider()?.sealed_headers_while(range, predicate)
678 }
679}
680
681impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
682 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
683 self.caught_up_static_file_provider()?.block_hash(number)
684 }
685
686 fn canonical_hashes_range(
687 &self,
688 start: BlockNumber,
689 end: BlockNumber,
690 ) -> ProviderResult<Vec<B256>> {
691 self.caught_up_static_file_provider()?.canonical_hashes_range(start, end)
692 }
693}
694
695impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
696 fn chain_info(&self) -> ProviderResult<ChainInfo> {
697 self.provider()?.chain_info()
698 }
699
700 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
701 self.provider()?.best_block_number()
702 }
703
704 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
705 self.caught_up_static_file_provider()?.last_block_number()
706 }
707
708 fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
709 Ok(self.caught_up_static_file_provider()?.earliest_history_height())
712 }
713
714 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
715 self.provider()?.block_number(hash)
716 }
717}
718
719impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
720 type Block = BlockTy<N>;
721
722 fn find_block_by_hash(
723 &self,
724 hash: B256,
725 source: BlockSource,
726 ) -> ProviderResult<Option<Self::Block>> {
727 self.provider()?.find_block_by_hash(hash, source)
728 }
729
730 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
731 self.provider()?.block(id)
732 }
733
734 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
735 self.provider()?.pending_block()
736 }
737
738 fn pending_block_and_receipts(
739 &self,
740 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
741 self.provider()?.pending_block_and_receipts()
742 }
743
744 fn recovered_block(
745 &self,
746 id: BlockHashOrNumber,
747 transaction_kind: TransactionVariant,
748 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
749 self.provider()?.recovered_block(id, transaction_kind)
750 }
751
752 fn sealed_block_with_senders(
753 &self,
754 id: BlockHashOrNumber,
755 transaction_kind: TransactionVariant,
756 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
757 self.provider()?.sealed_block_with_senders(id, transaction_kind)
758 }
759
760 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
761 self.provider()?.block_range(range)
762 }
763
764 fn block_with_senders_range(
765 &self,
766 range: RangeInclusive<BlockNumber>,
767 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
768 self.provider()?.block_with_senders_range(range)
769 }
770
771 fn recovered_block_range(
772 &self,
773 range: RangeInclusive<BlockNumber>,
774 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
775 self.provider()?.recovered_block_range(range)
776 }
777
778 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
779 self.provider()?.block_by_transaction_id(id)
780 }
781}
782
783impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
784 type Transaction = TxTy<N>;
785
786 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
787 self.provider()?.transaction_id(tx_hash)
788 }
789
790 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
791 self.caught_up_static_file_provider()?.transaction_by_id(id)
792 }
793
794 fn transaction_by_id_unhashed(
795 &self,
796 id: TxNumber,
797 ) -> ProviderResult<Option<Self::Transaction>> {
798 self.caught_up_static_file_provider()?.transaction_by_id_unhashed(id)
799 }
800
801 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
802 self.provider()?.transaction_by_hash(hash)
803 }
804
805 fn transaction_by_hash_with_meta(
806 &self,
807 tx_hash: TxHash,
808 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
809 self.provider()?.transaction_by_hash_with_meta(tx_hash)
810 }
811
812 fn transactions_by_block(
813 &self,
814 id: BlockHashOrNumber,
815 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
816 self.provider()?.transactions_by_block(id)
817 }
818
819 fn transactions_by_block_range(
820 &self,
821 range: impl RangeBounds<BlockNumber>,
822 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
823 self.provider()?.transactions_by_block_range(range)
824 }
825
826 fn transactions_by_tx_range(
827 &self,
828 range: impl RangeBounds<TxNumber>,
829 ) -> ProviderResult<Vec<Self::Transaction>> {
830 self.caught_up_static_file_provider()?.transactions_by_tx_range(range)
831 }
832
833 fn senders_by_tx_range(
834 &self,
835 range: impl RangeBounds<TxNumber>,
836 ) -> ProviderResult<Vec<Address>> {
837 if EitherWriterDestination::senders(self).is_static_file() {
838 self.caught_up_static_file_provider()?.senders_by_tx_range(range)
839 } else {
840 self.provider()?.senders_by_tx_range(range)
841 }
842 }
843
844 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
845 if EitherWriterDestination::senders(self).is_static_file() {
846 self.caught_up_static_file_provider()?.transaction_sender(id)
847 } else {
848 self.provider()?.transaction_sender(id)
849 }
850 }
851}
852
853impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
854 type Receipt = ReceiptTy<N>;
855
856 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
857 self.caught_up_static_file_provider()?.get_with_static_file_or_database(
858 StaticFileSegment::Receipts,
859 id,
860 |static_file| static_file.receipt(id),
861 || self.provider()?.receipt(id),
862 )
863 }
864
865 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
866 self.provider()?.receipt_by_hash(hash)
867 }
868
869 fn receipts_by_block(
870 &self,
871 block: BlockHashOrNumber,
872 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
873 self.provider()?.receipts_by_block(block)
874 }
875
876 fn receipts_by_tx_range(
877 &self,
878 range: impl RangeBounds<TxNumber>,
879 ) -> ProviderResult<Vec<Self::Receipt>> {
880 self.caught_up_static_file_provider()?.get_range_with_static_file_or_database(
881 StaticFileSegment::Receipts,
882 to_range(range),
883 |static_file, range, _| static_file.receipts_by_tx_range(range),
884 |range, _| self.provider()?.receipts_by_tx_range(range),
885 |_| true,
886 )
887 }
888
889 fn receipts_by_block_range(
890 &self,
891 block_range: RangeInclusive<BlockNumber>,
892 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
893 self.provider()?.receipts_by_block_range(block_range)
894 }
895}
896
897impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
898 fn block_body_indices(
899 &self,
900 number: BlockNumber,
901 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
902 self.provider()?.block_body_indices(number)
903 }
904
905 fn block_body_indices_range(
906 &self,
907 range: RangeInclusive<BlockNumber>,
908 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
909 self.provider()?.block_body_indices_range(range)
910 }
911}
912
913impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
914 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
915 self.provider()?.get_stage_checkpoint(id)
916 }
917
918 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
919 self.provider()?.get_stage_checkpoint_progress(id)
920 }
921 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
922 self.provider()?.get_all_checkpoints()
923 }
924}
925
926impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
927 type ChainSpec = N::ChainSpec;
928
929 fn chain_spec(&self) -> Arc<N::ChainSpec> {
930 self.chain_spec.clone()
931 }
932}
933
934impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
935 fn get_prune_checkpoint(
936 &self,
937 segment: PruneSegment,
938 ) -> ProviderResult<Option<PruneCheckpoint>> {
939 self.provider()?.get_prune_checkpoint(segment)
940 }
941
942 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
943 self.provider()?.get_prune_checkpoints()
944 }
945}
946
947impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
948 fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
949 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
950 }
951}
952
953impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
954 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
955 self.provider()?.get_metadata(key)
956 }
957}
958
959impl<N> fmt::Debug for ProviderFactory<N>
960where
961 N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
962{
963 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
964 let Self {
965 db,
966 chain_spec,
967 static_file_provider,
968 prune_modes,
969 storage,
970 storage_settings,
971 rocksdb_provider,
972 changeset_cache,
973 bal_store,
974 runtime,
975 minimum_pruning_distance,
976 read_only_sync,
977 } = self;
978 f.debug_struct("ProviderFactory")
979 .field("db", &db)
980 .field("chain_spec", &chain_spec)
981 .field("static_file_provider", &static_file_provider)
982 .field("prune_modes", &prune_modes)
983 .field("storage", &storage)
984 .field("storage_settings", &*storage_settings.read())
985 .field("rocksdb_provider", &rocksdb_provider)
986 .field("changeset_cache", &changeset_cache)
987 .field("bal_store", &bal_store)
988 .field("runtime", &runtime)
989 .field("minimum_pruning_distance", &minimum_pruning_distance)
990 .field(
991 "read_only_sync",
992 &read_only_sync.as_ref().map(|s| s.last_synced_txnid.load(Ordering::Relaxed)),
993 )
994 .finish()
995 }
996}
997
998impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
999 fn clone(&self) -> Self {
1000 Self {
1001 db: self.db.clone(),
1002 chain_spec: self.chain_spec.clone(),
1003 static_file_provider: self.static_file_provider.clone(),
1004 prune_modes: self.prune_modes.clone(),
1005 storage: self.storage.clone(),
1006 storage_settings: self.storage_settings.clone(),
1007 rocksdb_provider: self.rocksdb_provider.clone(),
1008 changeset_cache: self.changeset_cache.clone(),
1009 bal_store: self.bal_store.clone(),
1010 runtime: self.runtime.clone(),
1011 minimum_pruning_distance: self.minimum_pruning_distance,
1012 read_only_sync: self.read_only_sync.clone(),
1013 }
1014 }
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use super::*;
1020 use crate::{
1021 providers::{StaticFileProvider, StaticFileWriter},
1022 test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
1023 BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
1024 TransactionsProvider,
1025 };
1026 use alloy_primitives::{TxNumber, B256};
1027 use assert_matches::assert_matches;
1028 use reth_chainspec::ChainSpecBuilder;
1029 use reth_db::{
1030 mdbx::DatabaseArguments,
1031 test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
1032 };
1033 use reth_db_api::tables;
1034 use reth_primitives_traits::SignerRecoverable;
1035 use reth_prune_types::{PruneMode, PruneModes};
1036 use reth_storage_errors::provider::ProviderError;
1037 use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
1038 use std::{ops::RangeInclusive, sync::Arc};
1039
1040 #[test]
1041 fn common_history_provider() {
1042 let factory = create_test_provider_factory();
1043 let _ = factory.latest();
1044 }
1045
1046 #[test]
1047 fn default_chain_info() {
1048 let factory = create_test_provider_factory();
1049 let provider = factory.provider().unwrap();
1050
1051 let chain_info = provider.chain_info().expect("should be ok");
1052 assert_eq!(chain_info.best_number, 0);
1053 assert_eq!(chain_info.best_hash, B256::ZERO);
1054 }
1055
1056 #[test]
1057 fn provider_flow() {
1058 let factory = create_test_provider_factory();
1059 let provider = factory.provider().unwrap();
1060 provider.block_hash(0).unwrap();
1061 let provider_rw = factory.provider_rw().unwrap();
1062 provider_rw.block_hash(0).unwrap();
1063 provider.block_hash(0).unwrap();
1064 }
1065
1066 #[test]
1067 fn provider_factory_with_database_path() {
1068 let chain_spec = ChainSpecBuilder::mainnet().build();
1069 let (_static_dir, static_dir_path) = create_test_static_files_dir();
1070 let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
1071 let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
1072 let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
1073 _db_tempdir.path(),
1074 Arc::new(chain_spec),
1075 DatabaseArguments::new(Default::default()),
1076 StaticFileProvider::read_write(static_dir_path).unwrap(),
1077 RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
1078 reth_tasks::Runtime::test(),
1079 )
1080 .unwrap();
1081 let provider = factory.provider().unwrap();
1082 provider.block_hash(0).unwrap();
1083 let provider_rw = factory.provider_rw().unwrap();
1084 provider_rw.block_hash(0).unwrap();
1085 provider.block_hash(0).unwrap();
1086 }
1087
1088 #[test]
1089 fn insert_block_with_prune_modes() {
1090 let block = TEST_BLOCK.clone();
1091
1092 {
1093 let factory = create_test_provider_factory();
1094 let provider = factory.provider_rw().unwrap();
1095 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1096 assert_matches!(
1097 provider.transaction_sender(0), Ok(Some(sender))
1098 if sender == block.body().transactions[0].recover_signer().unwrap()
1099 );
1100 assert_matches!(
1101 provider.transaction_id(*block.body().transactions[0].tx_hash()),
1102 Ok(Some(0))
1103 );
1104 }
1105
1106 {
1107 let prune_modes = PruneModes {
1108 sender_recovery: Some(PruneMode::Full),
1109 transaction_lookup: Some(PruneMode::Full),
1110 ..PruneModes::default()
1111 };
1112 let factory = create_test_provider_factory().with_prune_modes(prune_modes);
1114 let provider = factory.provider_rw().unwrap();
1115 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1116 assert_matches!(provider.transaction_sender(0), Ok(None));
1117 assert_matches!(
1118 provider.transaction_id(*block.body().transactions[0].tx_hash()),
1119 Ok(None)
1120 );
1121 }
1122 }
1123
1124 #[test]
1125 fn take_block_transaction_range_recover_senders() {
1126 let mut rng = generators::rng();
1127 let block =
1128 random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
1129
1130 let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
1131 for range in tx_ranges {
1132 let factory = create_test_provider_factory();
1133 let provider = factory.provider_rw().unwrap();
1134
1135 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1136
1137 let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
1138 assert_eq!(
1139 senders,
1140 range
1141 .clone()
1142 .map(|tx_number| (
1143 tx_number,
1144 block.body().transactions[tx_number as usize].recover_signer().unwrap()
1145 ))
1146 .collect::<Vec<_>>()
1147 );
1148
1149 let db_senders = provider.senders_by_tx_range(range);
1150 assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
1151 }
1152 }
1153
1154 #[test]
1155 fn header_sync_gap_lookup() {
1156 let factory = create_test_provider_factory();
1157 let provider = factory.provider_rw().unwrap();
1158
1159 let mut rng = generators::rng();
1160
1161 let checkpoint = 0;
1163 let head = random_header(&mut rng, 0, None);
1164
1165 assert_matches!(
1167 provider.local_tip_header(checkpoint),
1168 Err(ProviderError::HeaderNotFound(block_number))
1169 if block_number.as_number().unwrap() == checkpoint
1170 );
1171
1172 let static_file_provider = provider.static_file_provider();
1174 let mut static_file_writer =
1175 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
1176 static_file_writer.append_header(head.header(), &head.hash()).unwrap();
1177 static_file_writer.commit().unwrap();
1178 drop(static_file_writer);
1179
1180 let local_head = provider.local_tip_header(checkpoint).unwrap();
1181
1182 assert_eq!(local_head, head);
1183 }
1184}