1use crate::{
2 providers::{
3 state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4 StaticFileProvider, StaticFileProviderRWRefMut,
5 },
6 to_range,
7 traits::{BlockSource, ReceiptProvider},
8 BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
9 EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
10 MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
11 StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
12 TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use notify::{RecommendedWatcher, RecursiveMode, Watcher};
19use parking_lot::RwLock;
20use reth_chainspec::ChainInfo;
21use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
22use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
23use reth_errors::{RethError, RethResult};
24use reth_node_types::{
25 BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
26};
27use reth_primitives_traits::{RecoveredBlock, SealedHeader};
28use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
29use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
30use reth_static_file_types::StaticFileSegment;
31use reth_storage_api::{
32 BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
33 NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
34};
35use reth_storage_errors::provider::ProviderResult;
36use reth_trie::HashedPostState;
37use reth_trie_db::ChangesetCache;
38use revm_database::BundleState;
39use std::{
40 ops::{RangeBounds, RangeInclusive},
41 path::Path,
42 sync::{
43 atomic::{AtomicU64, Ordering},
44 Arc, Mutex,
45 },
46};
47use tracing::{info, instrument, trace, warn};
48
49mod provider;
50pub use provider::{
51 CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
52};
53
54use super::ProviderNodeTypes;
55use reth_trie::KeccakKeyHasher;
56
57mod builder;
58pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
59
60mod metrics;
61
62mod chain;
63pub use chain::*;
64
65struct ReadOnlySyncState {
67 last_synced_txnid: AtomicU64,
69 sync_lock: Mutex<()>,
71}
72
73pub struct ProviderFactory<N: NodeTypesWithDB> {
77 db: N::DB,
79 chain_spec: Arc<N::ChainSpec>,
81 static_file_provider: StaticFileProvider<N::Primitives>,
83 prune_modes: PruneModes,
85 storage: Arc<N::Storage>,
87 storage_settings: Arc<RwLock<StorageSettings>>,
89 rocksdb_provider: RocksDBProvider,
91 changeset_cache: ChangesetCache,
93 runtime: reth_tasks::Runtime,
95 minimum_pruning_distance: u64,
97 read_only_sync: Option<Arc<ReadOnlySyncState>>,
102}
103
104impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
105 pub fn builder() -> ProviderFactoryBuilder<N> {
107 ProviderFactoryBuilder::default()
108 }
109}
110
111impl<N: ProviderNodeTypes> ProviderFactory<N> {
112 pub fn new(
120 db: N::DB,
121 chain_spec: Arc<N::ChainSpec>,
122 static_file_provider: StaticFileProvider<N::Primitives>,
123 rocksdb_provider: RocksDBProvider,
124 runtime: reth_tasks::Runtime,
125 ) -> ProviderResult<Self> {
126 let legacy_settings = StorageSettings::v1();
131 let storage_settings = DatabaseProvider::<_, N>::new(
132 db.tx()?,
133 chain_spec.clone(),
134 static_file_provider.clone(),
135 Default::default(),
136 Default::default(),
137 Arc::new(RwLock::new(legacy_settings)),
138 rocksdb_provider.clone(),
139 ChangesetCache::new(),
140 runtime.clone(),
141 db.path(),
142 )
143 .storage_settings()?
144 .unwrap_or(legacy_settings);
145
146 Ok(Self {
147 db,
148 chain_spec,
149 static_file_provider,
150 prune_modes: PruneModes::default(),
151 storage: Default::default(),
152 storage_settings: Arc::new(RwLock::new(storage_settings)),
153 rocksdb_provider,
154 changeset_cache: ChangesetCache::new(),
155 runtime,
156 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
157 read_only_sync: None,
158 })
159 }
160
161 pub fn new_checked(
168 db: N::DB,
169 chain_spec: Arc<N::ChainSpec>,
170 static_file_provider: StaticFileProvider<N::Primitives>,
171 rocksdb_provider: RocksDBProvider,
172 runtime: reth_tasks::Runtime,
173 ) -> ProviderResult<Self> {
174 Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
175 .and_then(Self::assert_consistent)
176 }
177}
178
179impl<N: NodeTypesWithDB> ProviderFactory<N> {
180 pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
182 self.prune_modes = prune_modes;
183 self
184 }
185
186 pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
188 self.changeset_cache = changeset_cache;
189 self
190 }
191
192 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
197 self.minimum_pruning_distance = distance;
198 self
199 }
200
201 pub fn with_read_only_sync(mut self, watch: bool) -> Self
207 where
208 N::DB: Database,
209 {
210 let state = Arc::new(ReadOnlySyncState {
214 last_synced_txnid: AtomicU64::new(0),
215 sync_lock: Mutex::new(()),
216 });
217 self.read_only_sync = Some(state);
218
219 if watch {
220 self.watch_db_directory();
221 }
222 self
223 }
224
225 fn watch_db_directory(&self)
228 where
229 N::DB: Database,
230 {
231 let factory = self.clone();
232 let db_path = self.db.path();
233 reth_tasks::spawn_os_thread("ro-sync", move || {
234 let (tx, rx) = std::sync::mpsc::channel();
235 let mut watcher = RecommendedWatcher::new(
236 move |res| {
237 let _ = tx.send(res);
238 },
239 notify::Config::default(),
240 )
241 .expect("failed to create watcher");
242
243 watcher
244 .watch(&db_path, RecursiveMode::NonRecursive)
245 .expect("failed to watch MDBX path");
246
247 while let Ok(res) = rx.recv() {
248 match res {
249 Ok(event) => {
250 if !matches!(
251 event.kind,
252 notify::EventKind::Modify(_) | notify::EventKind::Create(_)
253 ) {
254 continue;
255 }
256
257 if let Err(err) = factory.sync_providers_if_needed() {
258 warn!(target: "reth::provider", %err, "background ro-sync failed");
259 }
260 }
261 Err(err) => {
262 warn!(target: "reth::provider", ?err, "MDBX directory watcher error");
263 }
264 }
265 }
266 });
267 }
268
269 pub fn sync_providers_if_needed(&self) -> ProviderResult<()> {
275 let Some(sync_state) = &self.read_only_sync else { return Ok(()) };
276 let current_txnid = self.db.last_txnid().unwrap_or(0);
277
278 if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
280 return Ok(());
281 }
282
283 let _guard = sync_state.sync_lock.lock().unwrap_or_else(|e| e.into_inner());
285
286 if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
288 return Ok(());
289 }
290
291 self.rocksdb_provider.try_catch_up_with_primary()?;
292 self.static_file_provider.initialize_index()?;
293 sync_state.last_synced_txnid.store(current_txnid, Ordering::Relaxed);
294 Ok(())
295 }
296
297 pub const fn db_ref(&self) -> &N::DB {
299 &self.db
300 }
301
302 #[cfg(any(test, feature = "test-utils"))]
303 pub fn into_db(self) -> N::DB {
305 self.db
306 }
307}
308
309impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
310 fn cached_storage_settings(&self) -> StorageSettings {
311 *self.storage_settings.read()
312 }
313
314 fn set_storage_settings_cache(&self, settings: StorageSettings) {
315 *self.storage_settings.write() = settings;
316 }
317}
318
319impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
320 fn rocksdb_provider(&self) -> RocksDBProvider {
321 self.rocksdb_provider.clone()
322 }
323
324 fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
325 unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
326 }
327
328 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
329 unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
330 }
331}
332
333impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
334 pub fn new_with_database_path<P: AsRef<Path>>(
337 path: P,
338 chain_spec: Arc<N::ChainSpec>,
339 args: DatabaseArguments,
340 static_file_provider: StaticFileProvider<N::Primitives>,
341 rocksdb_provider: RocksDBProvider,
342 runtime: reth_tasks::Runtime,
343 ) -> RethResult<Self> {
344 Self::new(
345 init_db(path, args).map_err(RethError::msg)?,
346 chain_spec,
347 static_file_provider,
348 rocksdb_provider,
349 runtime,
350 )
351 .map_err(RethError::Provider)
352 }
353}
354
355impl<N: ProviderNodeTypes> ProviderFactory<N> {
356 #[track_caller]
363 pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
364 let db_tx = self.db.tx()?;
365
366 self.sync_providers_if_needed()?;
372
373 Ok(DatabaseProvider::new(
374 db_tx,
375 self.chain_spec.clone(),
376 self.static_file_provider.clone(),
377 self.prune_modes.clone(),
378 self.storage.clone(),
379 self.storage_settings.clone(),
380 self.rocksdb_provider.clone(),
381 self.changeset_cache.clone(),
382 self.runtime.clone(),
383 self.db.path(),
384 )
385 .with_minimum_pruning_distance(self.minimum_pruning_distance))
386 }
387
388 #[track_caller]
393 pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
394 Ok(DatabaseProviderRW(
395 DatabaseProvider::new_rw(
396 self.db.tx_mut()?,
397 self.chain_spec.clone(),
398 self.static_file_provider.clone(),
399 self.prune_modes.clone(),
400 self.storage.clone(),
401 self.storage_settings.clone(),
402 self.rocksdb_provider.clone(),
403 self.changeset_cache.clone(),
404 self.runtime.clone(),
405 self.db.path(),
406 )
407 .with_reader_txn_tracker(self.db.clone())
408 .with_minimum_pruning_distance(self.minimum_pruning_distance),
409 ))
410 }
411
412 #[track_caller]
419 pub fn unwind_provider_rw(
420 &self,
421 ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
422 Ok(DatabaseProvider::new_unwind_rw(
423 self.db.tx_mut()?,
424 self.chain_spec.clone(),
425 self.static_file_provider.clone(),
426 self.prune_modes.clone(),
427 self.storage.clone(),
428 self.storage_settings.clone(),
429 self.rocksdb_provider.clone(),
430 self.changeset_cache.clone(),
431 self.runtime.clone(),
432 self.db.path(),
433 )
434 .with_reader_txn_tracker(self.db.clone())
435 .with_minimum_pruning_distance(self.minimum_pruning_distance))
436 }
437
438 #[track_caller]
440 pub fn latest(&self) -> ProviderResult<StateProviderBox> {
441 trace!(target: "providers::db", "Returning latest state provider");
442 Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
443 }
444
445 pub fn history_by_block_number(
447 &self,
448 block_number: BlockNumber,
449 ) -> ProviderResult<StateProviderBox> {
450 let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
451 trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
452 Ok(state_provider)
453 }
454
455 pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
457 let provider = self.provider()?;
458
459 let block_number = provider
460 .block_number(block_hash)?
461 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
462
463 let state_provider = provider.try_into_history_at_block(block_number)?;
464 trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
465 Ok(state_provider)
466 }
467
468 pub fn assert_consistent(self) -> ProviderResult<Self> {
473 let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
474
475 let source = match (rocksdb_unwind, static_file_unwind) {
476 (None, None) => return Ok(self),
477 (Some(_), Some(_)) => "RocksDB and Static Files",
478 (Some(_), None) => "RocksDB",
479 (None, Some(_)) => "Static Files",
480 };
481
482 Err(ProviderError::MustUnwind {
483 data_source: source,
484 unwind_to: rocksdb_unwind
485 .into_iter()
486 .chain(static_file_unwind)
487 .min()
488 .expect("at least one unwind target must be Some"),
489 })
490 }
491
492 #[instrument(err, skip(self))]
496 pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
497 let provider_ro = self
498 .database_provider_ro()?
499 .disable_long_read_transaction_safety();
503
504 self.static_file_provider().check_file_consistency(&provider_ro)?;
506
507 let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
509
510 let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
512 |target| match target {
513 PipelineTarget::Unwind(block) => block,
514 PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
515 },
516 );
517
518 if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
523 self.heal_chain_state_block_numbers(&provider_ro)?;
524 }
525
526 Ok((rocksdb_unwind, static_file_unwind))
527 }
528
529 fn heal_chain_state_block_numbers(
532 &self,
533 provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
534 ) -> ProviderResult<()> {
535 let highest_header = self.last_block_number()?;
536
537 let finalized = provider_ro.last_finalized_block_number()?;
538 let safe = provider_ro.last_safe_block_number()?;
539
540 if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
541 {
542 return Ok(());
543 }
544
545 let provider_rw = self.provider_rw()?;
546
547 if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
548 info!(
549 target: "providers::db",
550 finalized,
551 highest_header,
552 "Healing finalized block number",
553 );
554 provider_rw.save_finalized_block_number(highest_header)?;
555 }
556
557 if let Some(safe) = safe.filter(|&s| s > highest_header) {
558 info!(
559 target: "providers::db",
560 safe,
561 highest_header,
562 "Healing safe block number",
563 );
564 provider_rw.save_safe_block_number(highest_header)?;
565 }
566
567 provider_rw.commit()?;
568
569 Ok(())
570 }
571
572 pub fn caught_up_static_file_provider(
575 &self,
576 ) -> ProviderResult<StaticFileProvider<N::Primitives>> {
577 self.sync_providers_if_needed()?;
578 Ok(self.static_file_provider.clone())
579 }
580}
581
582impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
583 type Primitives = N::Primitives;
584}
585
586impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
587 type DB = N::DB;
588 type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
589 type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
590
591 fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
592 self.provider()
593 }
594
595 fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
596 self.provider_rw().map(|provider| provider.0)
597 }
598}
599
600impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
601 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
603 self.static_file_provider.clone()
604 }
605
606 fn get_static_file_writer(
607 &self,
608 block: BlockNumber,
609 segment: StaticFileSegment,
610 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
611 self.static_file_provider.get_writer(block, segment)
612 }
613}
614
615impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
616 type Header = HeaderTy<N>;
617 fn local_tip_header(
618 &self,
619 highest_uninterrupted_block: BlockNumber,
620 ) -> ProviderResult<SealedHeader<Self::Header>> {
621 self.provider()?.local_tip_header(highest_uninterrupted_block)
622 }
623}
624
625impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
626 type Header = HeaderTy<N>;
627
628 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
629 self.provider()?.header(block_hash)
630 }
631
632 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
633 self.caught_up_static_file_provider()?.header_by_number(num)
634 }
635
636 fn headers_range(
637 &self,
638 range: impl RangeBounds<BlockNumber>,
639 ) -> ProviderResult<Vec<Self::Header>> {
640 self.caught_up_static_file_provider()?.headers_range(range)
641 }
642
643 fn sealed_header(
644 &self,
645 number: BlockNumber,
646 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
647 self.caught_up_static_file_provider()?.sealed_header(number)
648 }
649
650 fn sealed_headers_range(
651 &self,
652 range: impl RangeBounds<BlockNumber>,
653 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
654 self.caught_up_static_file_provider()?.sealed_headers_range(range)
655 }
656
657 fn sealed_headers_while(
658 &self,
659 range: impl RangeBounds<BlockNumber>,
660 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
661 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
662 self.caught_up_static_file_provider()?.sealed_headers_while(range, predicate)
663 }
664}
665
666impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
667 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
668 self.caught_up_static_file_provider()?.block_hash(number)
669 }
670
671 fn canonical_hashes_range(
672 &self,
673 start: BlockNumber,
674 end: BlockNumber,
675 ) -> ProviderResult<Vec<B256>> {
676 self.caught_up_static_file_provider()?.canonical_hashes_range(start, end)
677 }
678}
679
680impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
681 fn chain_info(&self) -> ProviderResult<ChainInfo> {
682 self.provider()?.chain_info()
683 }
684
685 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
686 self.provider()?.best_block_number()
687 }
688
689 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
690 self.caught_up_static_file_provider()?.last_block_number()
691 }
692
693 fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
694 Ok(self.caught_up_static_file_provider()?.earliest_history_height())
697 }
698
699 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
700 self.provider()?.block_number(hash)
701 }
702}
703
704impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
705 type Block = BlockTy<N>;
706
707 fn find_block_by_hash(
708 &self,
709 hash: B256,
710 source: BlockSource,
711 ) -> ProviderResult<Option<Self::Block>> {
712 self.provider()?.find_block_by_hash(hash, source)
713 }
714
715 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
716 self.provider()?.block(id)
717 }
718
719 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
720 self.provider()?.pending_block()
721 }
722
723 fn pending_block_and_receipts(
724 &self,
725 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
726 self.provider()?.pending_block_and_receipts()
727 }
728
729 fn recovered_block(
730 &self,
731 id: BlockHashOrNumber,
732 transaction_kind: TransactionVariant,
733 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
734 self.provider()?.recovered_block(id, transaction_kind)
735 }
736
737 fn sealed_block_with_senders(
738 &self,
739 id: BlockHashOrNumber,
740 transaction_kind: TransactionVariant,
741 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
742 self.provider()?.sealed_block_with_senders(id, transaction_kind)
743 }
744
745 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
746 self.provider()?.block_range(range)
747 }
748
749 fn block_with_senders_range(
750 &self,
751 range: RangeInclusive<BlockNumber>,
752 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
753 self.provider()?.block_with_senders_range(range)
754 }
755
756 fn recovered_block_range(
757 &self,
758 range: RangeInclusive<BlockNumber>,
759 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
760 self.provider()?.recovered_block_range(range)
761 }
762
763 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
764 self.provider()?.block_by_transaction_id(id)
765 }
766}
767
768impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
769 type Transaction = TxTy<N>;
770
771 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
772 self.provider()?.transaction_id(tx_hash)
773 }
774
775 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
776 self.caught_up_static_file_provider()?.transaction_by_id(id)
777 }
778
779 fn transaction_by_id_unhashed(
780 &self,
781 id: TxNumber,
782 ) -> ProviderResult<Option<Self::Transaction>> {
783 self.caught_up_static_file_provider()?.transaction_by_id_unhashed(id)
784 }
785
786 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
787 self.provider()?.transaction_by_hash(hash)
788 }
789
790 fn transaction_by_hash_with_meta(
791 &self,
792 tx_hash: TxHash,
793 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
794 self.provider()?.transaction_by_hash_with_meta(tx_hash)
795 }
796
797 fn transactions_by_block(
798 &self,
799 id: BlockHashOrNumber,
800 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
801 self.provider()?.transactions_by_block(id)
802 }
803
804 fn transactions_by_block_range(
805 &self,
806 range: impl RangeBounds<BlockNumber>,
807 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
808 self.provider()?.transactions_by_block_range(range)
809 }
810
811 fn transactions_by_tx_range(
812 &self,
813 range: impl RangeBounds<TxNumber>,
814 ) -> ProviderResult<Vec<Self::Transaction>> {
815 self.caught_up_static_file_provider()?.transactions_by_tx_range(range)
816 }
817
818 fn senders_by_tx_range(
819 &self,
820 range: impl RangeBounds<TxNumber>,
821 ) -> ProviderResult<Vec<Address>> {
822 if EitherWriterDestination::senders(self).is_static_file() {
823 self.caught_up_static_file_provider()?.senders_by_tx_range(range)
824 } else {
825 self.provider()?.senders_by_tx_range(range)
826 }
827 }
828
829 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
830 if EitherWriterDestination::senders(self).is_static_file() {
831 self.caught_up_static_file_provider()?.transaction_sender(id)
832 } else {
833 self.provider()?.transaction_sender(id)
834 }
835 }
836}
837
838impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
839 type Receipt = ReceiptTy<N>;
840
841 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
842 self.caught_up_static_file_provider()?.get_with_static_file_or_database(
843 StaticFileSegment::Receipts,
844 id,
845 |static_file| static_file.receipt(id),
846 || self.provider()?.receipt(id),
847 )
848 }
849
850 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
851 self.provider()?.receipt_by_hash(hash)
852 }
853
854 fn receipts_by_block(
855 &self,
856 block: BlockHashOrNumber,
857 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
858 self.provider()?.receipts_by_block(block)
859 }
860
861 fn receipts_by_tx_range(
862 &self,
863 range: impl RangeBounds<TxNumber>,
864 ) -> ProviderResult<Vec<Self::Receipt>> {
865 self.caught_up_static_file_provider()?.get_range_with_static_file_or_database(
866 StaticFileSegment::Receipts,
867 to_range(range),
868 |static_file, range, _| static_file.receipts_by_tx_range(range),
869 |range, _| self.provider()?.receipts_by_tx_range(range),
870 |_| true,
871 )
872 }
873
874 fn receipts_by_block_range(
875 &self,
876 block_range: RangeInclusive<BlockNumber>,
877 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
878 self.provider()?.receipts_by_block_range(block_range)
879 }
880}
881
882impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
883 fn block_body_indices(
884 &self,
885 number: BlockNumber,
886 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
887 self.provider()?.block_body_indices(number)
888 }
889
890 fn block_body_indices_range(
891 &self,
892 range: RangeInclusive<BlockNumber>,
893 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
894 self.provider()?.block_body_indices_range(range)
895 }
896}
897
898impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
899 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
900 self.provider()?.get_stage_checkpoint(id)
901 }
902
903 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
904 self.provider()?.get_stage_checkpoint_progress(id)
905 }
906 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
907 self.provider()?.get_all_checkpoints()
908 }
909}
910
911impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
912 type ChainSpec = N::ChainSpec;
913
914 fn chain_spec(&self) -> Arc<N::ChainSpec> {
915 self.chain_spec.clone()
916 }
917}
918
919impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
920 fn get_prune_checkpoint(
921 &self,
922 segment: PruneSegment,
923 ) -> ProviderResult<Option<PruneCheckpoint>> {
924 self.provider()?.get_prune_checkpoint(segment)
925 }
926
927 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
928 self.provider()?.get_prune_checkpoints()
929 }
930}
931
932impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
933 fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
934 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
935 }
936}
937
938impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
939 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
940 self.provider()?.get_metadata(key)
941 }
942}
943
944impl<N> fmt::Debug for ProviderFactory<N>
945where
946 N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
947{
948 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
949 let Self {
950 db,
951 chain_spec,
952 static_file_provider,
953 prune_modes,
954 storage,
955 storage_settings,
956 rocksdb_provider,
957 changeset_cache,
958 runtime,
959 minimum_pruning_distance,
960 read_only_sync,
961 } = self;
962 f.debug_struct("ProviderFactory")
963 .field("db", &db)
964 .field("chain_spec", &chain_spec)
965 .field("static_file_provider", &static_file_provider)
966 .field("prune_modes", &prune_modes)
967 .field("storage", &storage)
968 .field("storage_settings", &*storage_settings.read())
969 .field("rocksdb_provider", &rocksdb_provider)
970 .field("changeset_cache", &changeset_cache)
971 .field("runtime", &runtime)
972 .field("minimum_pruning_distance", &minimum_pruning_distance)
973 .field(
974 "read_only_sync",
975 &read_only_sync.as_ref().map(|s| s.last_synced_txnid.load(Ordering::Relaxed)),
976 )
977 .finish()
978 }
979}
980
981impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
982 fn clone(&self) -> Self {
983 Self {
984 db: self.db.clone(),
985 chain_spec: self.chain_spec.clone(),
986 static_file_provider: self.static_file_provider.clone(),
987 prune_modes: self.prune_modes.clone(),
988 storage: self.storage.clone(),
989 storage_settings: self.storage_settings.clone(),
990 rocksdb_provider: self.rocksdb_provider.clone(),
991 changeset_cache: self.changeset_cache.clone(),
992 runtime: self.runtime.clone(),
993 minimum_pruning_distance: self.minimum_pruning_distance,
994 read_only_sync: self.read_only_sync.clone(),
995 }
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::*;
1002 use crate::{
1003 providers::{StaticFileProvider, StaticFileWriter},
1004 test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
1005 BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
1006 TransactionsProvider,
1007 };
1008 use alloy_primitives::{TxNumber, B256};
1009 use assert_matches::assert_matches;
1010 use reth_chainspec::ChainSpecBuilder;
1011 use reth_db::{
1012 mdbx::DatabaseArguments,
1013 test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
1014 };
1015 use reth_db_api::tables;
1016 use reth_primitives_traits::SignerRecoverable;
1017 use reth_prune_types::{PruneMode, PruneModes};
1018 use reth_storage_errors::provider::ProviderError;
1019 use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
1020 use std::{ops::RangeInclusive, sync::Arc};
1021
1022 #[test]
1023 fn common_history_provider() {
1024 let factory = create_test_provider_factory();
1025 let _ = factory.latest();
1026 }
1027
1028 #[test]
1029 fn default_chain_info() {
1030 let factory = create_test_provider_factory();
1031 let provider = factory.provider().unwrap();
1032
1033 let chain_info = provider.chain_info().expect("should be ok");
1034 assert_eq!(chain_info.best_number, 0);
1035 assert_eq!(chain_info.best_hash, B256::ZERO);
1036 }
1037
1038 #[test]
1039 fn provider_flow() {
1040 let factory = create_test_provider_factory();
1041 let provider = factory.provider().unwrap();
1042 provider.block_hash(0).unwrap();
1043 let provider_rw = factory.provider_rw().unwrap();
1044 provider_rw.block_hash(0).unwrap();
1045 provider.block_hash(0).unwrap();
1046 }
1047
1048 #[test]
1049 fn provider_factory_with_database_path() {
1050 let chain_spec = ChainSpecBuilder::mainnet().build();
1051 let (_static_dir, static_dir_path) = create_test_static_files_dir();
1052 let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
1053 let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
1054 let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
1055 _db_tempdir.path(),
1056 Arc::new(chain_spec),
1057 DatabaseArguments::new(Default::default()),
1058 StaticFileProvider::read_write(static_dir_path).unwrap(),
1059 RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
1060 reth_tasks::Runtime::test(),
1061 )
1062 .unwrap();
1063 let provider = factory.provider().unwrap();
1064 provider.block_hash(0).unwrap();
1065 let provider_rw = factory.provider_rw().unwrap();
1066 provider_rw.block_hash(0).unwrap();
1067 provider.block_hash(0).unwrap();
1068 }
1069
1070 #[test]
1071 fn insert_block_with_prune_modes() {
1072 let block = TEST_BLOCK.clone();
1073
1074 {
1075 let factory = create_test_provider_factory();
1076 let provider = factory.provider_rw().unwrap();
1077 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1078 assert_matches!(
1079 provider.transaction_sender(0), Ok(Some(sender))
1080 if sender == block.body().transactions[0].recover_signer().unwrap()
1081 );
1082 assert_matches!(
1083 provider.transaction_id(*block.body().transactions[0].tx_hash()),
1084 Ok(Some(0))
1085 );
1086 }
1087
1088 {
1089 let prune_modes = PruneModes {
1090 sender_recovery: Some(PruneMode::Full),
1091 transaction_lookup: Some(PruneMode::Full),
1092 ..PruneModes::default()
1093 };
1094 let factory = create_test_provider_factory().with_prune_modes(prune_modes);
1096 let provider = factory.provider_rw().unwrap();
1097 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1098 assert_matches!(provider.transaction_sender(0), Ok(None));
1099 assert_matches!(
1100 provider.transaction_id(*block.body().transactions[0].tx_hash()),
1101 Ok(None)
1102 );
1103 }
1104 }
1105
1106 #[test]
1107 fn take_block_transaction_range_recover_senders() {
1108 let mut rng = generators::rng();
1109 let block =
1110 random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
1111
1112 let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
1113 for range in tx_ranges {
1114 let factory = create_test_provider_factory();
1115 let provider = factory.provider_rw().unwrap();
1116
1117 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1118
1119 let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
1120 assert_eq!(
1121 senders,
1122 range
1123 .clone()
1124 .map(|tx_number| (
1125 tx_number,
1126 block.body().transactions[tx_number as usize].recover_signer().unwrap()
1127 ))
1128 .collect::<Vec<_>>()
1129 );
1130
1131 let db_senders = provider.senders_by_tx_range(range);
1132 assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
1133 }
1134 }
1135
1136 #[test]
1137 fn header_sync_gap_lookup() {
1138 let factory = create_test_provider_factory();
1139 let provider = factory.provider_rw().unwrap();
1140
1141 let mut rng = generators::rng();
1142
1143 let checkpoint = 0;
1145 let head = random_header(&mut rng, 0, None);
1146
1147 assert_matches!(
1149 provider.local_tip_header(checkpoint),
1150 Err(ProviderError::HeaderNotFound(block_number))
1151 if block_number.as_number().unwrap() == checkpoint
1152 );
1153
1154 let static_file_provider = provider.static_file_provider();
1156 let mut static_file_writer =
1157 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
1158 static_file_writer.append_header(head.header(), &head.hash()).unwrap();
1159 static_file_writer.commit().unwrap();
1160 drop(static_file_writer);
1161
1162 let local_head = provider.local_tip_header(checkpoint).unwrap();
1163
1164 assert_eq!(local_head, head);
1165 }
1166}