1use crate::{
2 providers::{
3 state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4 StaticFileProvider, StaticFileProviderRWRefMut,
5 },
6 to_range,
7 traits::{BlockSource, ReceiptProvider},
8 BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
9 EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
10 MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
11 StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
12 TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use parking_lot::RwLock;
19use reth_chainspec::ChainInfo;
20use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
21use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
22use reth_errors::{RethError, RethResult};
23use reth_node_types::{
24 BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
25};
26use reth_primitives_traits::{RecoveredBlock, SealedHeader};
27use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
28use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
29use reth_static_file_types::StaticFileSegment;
30use reth_storage_api::{
31 BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
32 NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
33};
34use reth_storage_errors::provider::ProviderResult;
35use reth_trie::HashedPostState;
36use reth_trie_db::ChangesetCache;
37use revm_database::BundleState;
38use std::{
39 ops::{RangeBounds, RangeInclusive},
40 path::Path,
41 sync::Arc,
42};
43use tracing::{info, instrument, trace};
44
45mod provider;
46pub use provider::{
47 CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
48};
49
50use super::ProviderNodeTypes;
51use reth_trie::KeccakKeyHasher;
52
53mod builder;
54pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
55
56mod metrics;
57
58mod chain;
59pub use chain::*;
60
61pub struct ProviderFactory<N: NodeTypesWithDB> {
65 db: N::DB,
67 chain_spec: Arc<N::ChainSpec>,
69 static_file_provider: StaticFileProvider<N::Primitives>,
71 prune_modes: PruneModes,
73 storage: Arc<N::Storage>,
75 storage_settings: Arc<RwLock<StorageSettings>>,
77 rocksdb_provider: RocksDBProvider,
79 changeset_cache: ChangesetCache,
81 runtime: reth_tasks::Runtime,
83 minimum_pruning_distance: u64,
85}
86
87impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
88 pub fn builder() -> ProviderFactoryBuilder<N> {
90 ProviderFactoryBuilder::default()
91 }
92}
93
94impl<N: ProviderNodeTypes> ProviderFactory<N> {
95 pub fn new(
103 db: N::DB,
104 chain_spec: Arc<N::ChainSpec>,
105 static_file_provider: StaticFileProvider<N::Primitives>,
106 rocksdb_provider: RocksDBProvider,
107 runtime: reth_tasks::Runtime,
108 ) -> ProviderResult<Self> {
109 let legacy_settings = StorageSettings::v1();
114 let storage_settings = DatabaseProvider::<_, N>::new(
115 db.tx()?,
116 chain_spec.clone(),
117 static_file_provider.clone(),
118 Default::default(),
119 Default::default(),
120 Arc::new(RwLock::new(legacy_settings)),
121 rocksdb_provider.clone(),
122 ChangesetCache::new(),
123 runtime.clone(),
124 db.path(),
125 )
126 .storage_settings()?
127 .unwrap_or(legacy_settings);
128
129 Ok(Self {
130 db,
131 chain_spec,
132 static_file_provider,
133 prune_modes: PruneModes::default(),
134 storage: Default::default(),
135 storage_settings: Arc::new(RwLock::new(storage_settings)),
136 rocksdb_provider,
137 changeset_cache: ChangesetCache::new(),
138 runtime,
139 minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
140 })
141 }
142
143 pub fn new_checked(
150 db: N::DB,
151 chain_spec: Arc<N::ChainSpec>,
152 static_file_provider: StaticFileProvider<N::Primitives>,
153 rocksdb_provider: RocksDBProvider,
154 runtime: reth_tasks::Runtime,
155 ) -> ProviderResult<Self> {
156 Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
157 .and_then(Self::assert_consistent)
158 }
159}
160
161impl<N: NodeTypesWithDB> ProviderFactory<N> {
162 pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
164 self.prune_modes = prune_modes;
165 self
166 }
167
168 pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
170 self.changeset_cache = changeset_cache;
171 self
172 }
173
174 pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
179 self.minimum_pruning_distance = distance;
180 self
181 }
182
183 pub const fn db_ref(&self) -> &N::DB {
185 &self.db
186 }
187
188 #[cfg(any(test, feature = "test-utils"))]
189 pub fn into_db(self) -> N::DB {
191 self.db
192 }
193}
194
195impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
196 fn cached_storage_settings(&self) -> StorageSettings {
197 *self.storage_settings.read()
198 }
199
200 fn set_storage_settings_cache(&self, settings: StorageSettings) {
201 *self.storage_settings.write() = settings;
202 }
203}
204
205impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
206 fn rocksdb_provider(&self) -> RocksDBProvider {
207 self.rocksdb_provider.clone()
208 }
209
210 fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
211 unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
212 }
213
214 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
215 unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
216 }
217}
218
219impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
220 pub fn new_with_database_path<P: AsRef<Path>>(
223 path: P,
224 chain_spec: Arc<N::ChainSpec>,
225 args: DatabaseArguments,
226 static_file_provider: StaticFileProvider<N::Primitives>,
227 rocksdb_provider: RocksDBProvider,
228 runtime: reth_tasks::Runtime,
229 ) -> RethResult<Self> {
230 Self::new(
231 init_db(path, args).map_err(RethError::msg)?,
232 chain_spec,
233 static_file_provider,
234 rocksdb_provider,
235 runtime,
236 )
237 .map_err(RethError::Provider)
238 }
239}
240
241impl<N: ProviderNodeTypes> ProviderFactory<N> {
242 #[track_caller]
249 pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
250 Ok(DatabaseProvider::new(
251 self.db.tx()?,
252 self.chain_spec.clone(),
253 self.static_file_provider.clone(),
254 self.prune_modes.clone(),
255 self.storage.clone(),
256 self.storage_settings.clone(),
257 self.rocksdb_provider.clone(),
258 self.changeset_cache.clone(),
259 self.runtime.clone(),
260 self.db.path(),
261 )
262 .with_minimum_pruning_distance(self.minimum_pruning_distance))
263 }
264
265 #[track_caller]
270 pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
271 Ok(DatabaseProviderRW(
272 DatabaseProvider::new_rw(
273 self.db.tx_mut()?,
274 self.chain_spec.clone(),
275 self.static_file_provider.clone(),
276 self.prune_modes.clone(),
277 self.storage.clone(),
278 self.storage_settings.clone(),
279 self.rocksdb_provider.clone(),
280 self.changeset_cache.clone(),
281 self.runtime.clone(),
282 self.db.path(),
283 )
284 .with_minimum_pruning_distance(self.minimum_pruning_distance),
285 ))
286 }
287
288 #[track_caller]
292 pub fn unwind_provider_rw(
293 &self,
294 ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
295 Ok(DatabaseProvider::new_unwind_rw(
296 self.db.tx_mut()?,
297 self.chain_spec.clone(),
298 self.static_file_provider.clone(),
299 self.prune_modes.clone(),
300 self.storage.clone(),
301 self.storage_settings.clone(),
302 self.rocksdb_provider.clone(),
303 self.changeset_cache.clone(),
304 self.runtime.clone(),
305 self.db.path(),
306 )
307 .with_minimum_pruning_distance(self.minimum_pruning_distance))
308 }
309
310 #[track_caller]
312 pub fn latest(&self) -> ProviderResult<StateProviderBox> {
313 trace!(target: "providers::db", "Returning latest state provider");
314 Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
315 }
316
317 pub fn history_by_block_number(
319 &self,
320 block_number: BlockNumber,
321 ) -> ProviderResult<StateProviderBox> {
322 let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
323 trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
324 Ok(state_provider)
325 }
326
327 pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
329 let provider = self.provider()?;
330
331 let block_number = provider
332 .block_number(block_hash)?
333 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
334
335 let state_provider = provider.try_into_history_at_block(block_number)?;
336 trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
337 Ok(state_provider)
338 }
339
340 pub fn assert_consistent(self) -> ProviderResult<Self> {
345 let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
346
347 let source = match (rocksdb_unwind, static_file_unwind) {
348 (None, None) => return Ok(self),
349 (Some(_), Some(_)) => "RocksDB and Static Files",
350 (Some(_), None) => "RocksDB",
351 (None, Some(_)) => "Static Files",
352 };
353
354 Err(ProviderError::MustUnwind {
355 data_source: source,
356 unwind_to: rocksdb_unwind
357 .into_iter()
358 .chain(static_file_unwind)
359 .min()
360 .expect("at least one unwind target must be Some"),
361 })
362 }
363
364 #[instrument(err, skip(self))]
368 pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
369 let provider_ro = self
370 .database_provider_ro()?
371 .disable_long_read_transaction_safety();
375
376 self.static_file_provider().check_file_consistency(&provider_ro)?;
378
379 let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
381
382 let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
384 |target| match target {
385 PipelineTarget::Unwind(block) => block,
386 PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
387 },
388 );
389
390 if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
395 self.heal_chain_state_block_numbers(&provider_ro)?;
396 }
397
398 Ok((rocksdb_unwind, static_file_unwind))
399 }
400
401 fn heal_chain_state_block_numbers(
404 &self,
405 provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
406 ) -> ProviderResult<()> {
407 let highest_header = self.last_block_number()?;
408
409 let finalized = provider_ro.last_finalized_block_number()?;
410 let safe = provider_ro.last_safe_block_number()?;
411
412 if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
413 {
414 return Ok(());
415 }
416
417 let provider_rw = self.provider_rw()?;
418
419 if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
420 info!(
421 target: "providers::db",
422 finalized,
423 highest_header,
424 "Healing finalized block number",
425 );
426 provider_rw.save_finalized_block_number(highest_header)?;
427 }
428
429 if let Some(safe) = safe.filter(|&s| s > highest_header) {
430 info!(
431 target: "providers::db",
432 safe,
433 highest_header,
434 "Healing safe block number",
435 );
436 provider_rw.save_safe_block_number(highest_header)?;
437 }
438
439 provider_rw.commit()?;
440
441 Ok(())
442 }
443}
444
445impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
446 type Primitives = N::Primitives;
447}
448
449impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
450 type DB = N::DB;
451 type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
452 type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
453
454 fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
455 self.provider()
456 }
457
458 fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
459 self.provider_rw().map(|provider| provider.0)
460 }
461}
462
463impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
464 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
466 self.static_file_provider.clone()
467 }
468
469 fn get_static_file_writer(
470 &self,
471 block: BlockNumber,
472 segment: StaticFileSegment,
473 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
474 self.static_file_provider.get_writer(block, segment)
475 }
476}
477
478impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
479 type Header = HeaderTy<N>;
480 fn local_tip_header(
481 &self,
482 highest_uninterrupted_block: BlockNumber,
483 ) -> ProviderResult<SealedHeader<Self::Header>> {
484 self.provider()?.local_tip_header(highest_uninterrupted_block)
485 }
486}
487
488impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
489 type Header = HeaderTy<N>;
490
491 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
492 self.provider()?.header(block_hash)
493 }
494
495 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
496 self.static_file_provider.header_by_number(num)
497 }
498
499 fn headers_range(
500 &self,
501 range: impl RangeBounds<BlockNumber>,
502 ) -> ProviderResult<Vec<Self::Header>> {
503 self.static_file_provider.headers_range(range)
504 }
505
506 fn sealed_header(
507 &self,
508 number: BlockNumber,
509 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
510 self.static_file_provider.sealed_header(number)
511 }
512
513 fn sealed_headers_range(
514 &self,
515 range: impl RangeBounds<BlockNumber>,
516 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
517 self.static_file_provider.sealed_headers_range(range)
518 }
519
520 fn sealed_headers_while(
521 &self,
522 range: impl RangeBounds<BlockNumber>,
523 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
524 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
525 self.static_file_provider.sealed_headers_while(range, predicate)
526 }
527}
528
529impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
530 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
531 self.static_file_provider.block_hash(number)
532 }
533
534 fn canonical_hashes_range(
535 &self,
536 start: BlockNumber,
537 end: BlockNumber,
538 ) -> ProviderResult<Vec<B256>> {
539 self.static_file_provider.canonical_hashes_range(start, end)
540 }
541}
542
543impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
544 fn chain_info(&self) -> ProviderResult<ChainInfo> {
545 self.provider()?.chain_info()
546 }
547
548 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
549 self.provider()?.best_block_number()
550 }
551
552 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
553 self.static_file_provider.last_block_number()
554 }
555
556 fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
557 Ok(self.static_file_provider.earliest_history_height())
560 }
561
562 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
563 self.provider()?.block_number(hash)
564 }
565}
566
567impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
568 type Block = BlockTy<N>;
569
570 fn find_block_by_hash(
571 &self,
572 hash: B256,
573 source: BlockSource,
574 ) -> ProviderResult<Option<Self::Block>> {
575 self.provider()?.find_block_by_hash(hash, source)
576 }
577
578 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
579 self.provider()?.block(id)
580 }
581
582 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
583 self.provider()?.pending_block()
584 }
585
586 fn pending_block_and_receipts(
587 &self,
588 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
589 self.provider()?.pending_block_and_receipts()
590 }
591
592 fn recovered_block(
593 &self,
594 id: BlockHashOrNumber,
595 transaction_kind: TransactionVariant,
596 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
597 self.provider()?.recovered_block(id, transaction_kind)
598 }
599
600 fn sealed_block_with_senders(
601 &self,
602 id: BlockHashOrNumber,
603 transaction_kind: TransactionVariant,
604 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
605 self.provider()?.sealed_block_with_senders(id, transaction_kind)
606 }
607
608 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
609 self.provider()?.block_range(range)
610 }
611
612 fn block_with_senders_range(
613 &self,
614 range: RangeInclusive<BlockNumber>,
615 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
616 self.provider()?.block_with_senders_range(range)
617 }
618
619 fn recovered_block_range(
620 &self,
621 range: RangeInclusive<BlockNumber>,
622 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
623 self.provider()?.recovered_block_range(range)
624 }
625
626 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
627 self.provider()?.block_by_transaction_id(id)
628 }
629}
630
631impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
632 type Transaction = TxTy<N>;
633
634 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
635 self.provider()?.transaction_id(tx_hash)
636 }
637
638 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
639 self.static_file_provider.transaction_by_id(id)
640 }
641
642 fn transaction_by_id_unhashed(
643 &self,
644 id: TxNumber,
645 ) -> ProviderResult<Option<Self::Transaction>> {
646 self.static_file_provider.transaction_by_id_unhashed(id)
647 }
648
649 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
650 self.provider()?.transaction_by_hash(hash)
651 }
652
653 fn transaction_by_hash_with_meta(
654 &self,
655 tx_hash: TxHash,
656 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
657 self.provider()?.transaction_by_hash_with_meta(tx_hash)
658 }
659
660 fn transactions_by_block(
661 &self,
662 id: BlockHashOrNumber,
663 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
664 self.provider()?.transactions_by_block(id)
665 }
666
667 fn transactions_by_block_range(
668 &self,
669 range: impl RangeBounds<BlockNumber>,
670 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
671 self.provider()?.transactions_by_block_range(range)
672 }
673
674 fn transactions_by_tx_range(
675 &self,
676 range: impl RangeBounds<TxNumber>,
677 ) -> ProviderResult<Vec<Self::Transaction>> {
678 self.static_file_provider.transactions_by_tx_range(range)
679 }
680
681 fn senders_by_tx_range(
682 &self,
683 range: impl RangeBounds<TxNumber>,
684 ) -> ProviderResult<Vec<Address>> {
685 if EitherWriterDestination::senders(self).is_static_file() {
686 self.static_file_provider.senders_by_tx_range(range)
687 } else {
688 self.provider()?.senders_by_tx_range(range)
689 }
690 }
691
692 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
693 if EitherWriterDestination::senders(self).is_static_file() {
694 self.static_file_provider.transaction_sender(id)
695 } else {
696 self.provider()?.transaction_sender(id)
697 }
698 }
699}
700
701impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
702 type Receipt = ReceiptTy<N>;
703
704 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
705 self.static_file_provider.get_with_static_file_or_database(
706 StaticFileSegment::Receipts,
707 id,
708 |static_file| static_file.receipt(id),
709 || self.provider()?.receipt(id),
710 )
711 }
712
713 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
714 self.provider()?.receipt_by_hash(hash)
715 }
716
717 fn receipts_by_block(
718 &self,
719 block: BlockHashOrNumber,
720 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
721 self.provider()?.receipts_by_block(block)
722 }
723
724 fn receipts_by_tx_range(
725 &self,
726 range: impl RangeBounds<TxNumber>,
727 ) -> ProviderResult<Vec<Self::Receipt>> {
728 self.static_file_provider.get_range_with_static_file_or_database(
729 StaticFileSegment::Receipts,
730 to_range(range),
731 |static_file, range, _| static_file.receipts_by_tx_range(range),
732 |range, _| self.provider()?.receipts_by_tx_range(range),
733 |_| true,
734 )
735 }
736
737 fn receipts_by_block_range(
738 &self,
739 block_range: RangeInclusive<BlockNumber>,
740 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
741 self.provider()?.receipts_by_block_range(block_range)
742 }
743}
744
745impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
746 fn block_body_indices(
747 &self,
748 number: BlockNumber,
749 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
750 self.provider()?.block_body_indices(number)
751 }
752
753 fn block_body_indices_range(
754 &self,
755 range: RangeInclusive<BlockNumber>,
756 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
757 self.provider()?.block_body_indices_range(range)
758 }
759}
760
761impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
762 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
763 self.provider()?.get_stage_checkpoint(id)
764 }
765
766 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
767 self.provider()?.get_stage_checkpoint_progress(id)
768 }
769 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
770 self.provider()?.get_all_checkpoints()
771 }
772}
773
774impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
775 type ChainSpec = N::ChainSpec;
776
777 fn chain_spec(&self) -> Arc<N::ChainSpec> {
778 self.chain_spec.clone()
779 }
780}
781
782impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
783 fn get_prune_checkpoint(
784 &self,
785 segment: PruneSegment,
786 ) -> ProviderResult<Option<PruneCheckpoint>> {
787 self.provider()?.get_prune_checkpoint(segment)
788 }
789
790 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
791 self.provider()?.get_prune_checkpoints()
792 }
793}
794
795impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
796 fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
797 HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
798 }
799}
800
801impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
802 fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
803 self.provider()?.get_metadata(key)
804 }
805}
806
807impl<N> fmt::Debug for ProviderFactory<N>
808where
809 N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
810{
811 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812 let Self {
813 db,
814 chain_spec,
815 static_file_provider,
816 prune_modes,
817 storage,
818 storage_settings,
819 rocksdb_provider,
820 changeset_cache,
821 runtime,
822 minimum_pruning_distance,
823 } = self;
824 f.debug_struct("ProviderFactory")
825 .field("db", &db)
826 .field("chain_spec", &chain_spec)
827 .field("static_file_provider", &static_file_provider)
828 .field("prune_modes", &prune_modes)
829 .field("storage", &storage)
830 .field("storage_settings", &*storage_settings.read())
831 .field("rocksdb_provider", &rocksdb_provider)
832 .field("changeset_cache", &changeset_cache)
833 .field("runtime", &runtime)
834 .field("minimum_pruning_distance", &minimum_pruning_distance)
835 .finish()
836 }
837}
838
839impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
840 fn clone(&self) -> Self {
841 Self {
842 db: self.db.clone(),
843 chain_spec: self.chain_spec.clone(),
844 static_file_provider: self.static_file_provider.clone(),
845 prune_modes: self.prune_modes.clone(),
846 storage: self.storage.clone(),
847 storage_settings: self.storage_settings.clone(),
848 rocksdb_provider: self.rocksdb_provider.clone(),
849 changeset_cache: self.changeset_cache.clone(),
850 runtime: self.runtime.clone(),
851 minimum_pruning_distance: self.minimum_pruning_distance,
852 }
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use super::*;
859 use crate::{
860 providers::{StaticFileProvider, StaticFileWriter},
861 test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
862 BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
863 TransactionsProvider,
864 };
865 use alloy_primitives::{TxNumber, B256};
866 use assert_matches::assert_matches;
867 use reth_chainspec::ChainSpecBuilder;
868 use reth_db::{
869 mdbx::DatabaseArguments,
870 test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
871 };
872 use reth_db_api::tables;
873 use reth_primitives_traits::SignerRecoverable;
874 use reth_prune_types::{PruneMode, PruneModes};
875 use reth_storage_errors::provider::ProviderError;
876 use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
877 use std::{ops::RangeInclusive, sync::Arc};
878
879 #[test]
880 fn common_history_provider() {
881 let factory = create_test_provider_factory();
882 let _ = factory.latest();
883 }
884
885 #[test]
886 fn default_chain_info() {
887 let factory = create_test_provider_factory();
888 let provider = factory.provider().unwrap();
889
890 let chain_info = provider.chain_info().expect("should be ok");
891 assert_eq!(chain_info.best_number, 0);
892 assert_eq!(chain_info.best_hash, B256::ZERO);
893 }
894
895 #[test]
896 fn provider_flow() {
897 let factory = create_test_provider_factory();
898 let provider = factory.provider().unwrap();
899 provider.block_hash(0).unwrap();
900 let provider_rw = factory.provider_rw().unwrap();
901 provider_rw.block_hash(0).unwrap();
902 provider.block_hash(0).unwrap();
903 }
904
905 #[test]
906 fn provider_factory_with_database_path() {
907 let chain_spec = ChainSpecBuilder::mainnet().build();
908 let (_static_dir, static_dir_path) = create_test_static_files_dir();
909 let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
910 let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
911 let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
912 _db_tempdir.path(),
913 Arc::new(chain_spec),
914 DatabaseArguments::new(Default::default()),
915 StaticFileProvider::read_write(static_dir_path).unwrap(),
916 RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
917 reth_tasks::Runtime::test(),
918 )
919 .unwrap();
920 let provider = factory.provider().unwrap();
921 provider.block_hash(0).unwrap();
922 let provider_rw = factory.provider_rw().unwrap();
923 provider_rw.block_hash(0).unwrap();
924 provider.block_hash(0).unwrap();
925 }
926
927 #[test]
928 fn insert_block_with_prune_modes() {
929 let block = TEST_BLOCK.clone();
930
931 {
932 let factory = create_test_provider_factory();
933 let provider = factory.provider_rw().unwrap();
934 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
935 assert_matches!(
936 provider.transaction_sender(0), Ok(Some(sender))
937 if sender == block.body().transactions[0].recover_signer().unwrap()
938 );
939 assert_matches!(
940 provider.transaction_id(*block.body().transactions[0].tx_hash()),
941 Ok(Some(0))
942 );
943 }
944
945 {
946 let prune_modes = PruneModes {
947 sender_recovery: Some(PruneMode::Full),
948 transaction_lookup: Some(PruneMode::Full),
949 ..PruneModes::default()
950 };
951 let factory = create_test_provider_factory().with_prune_modes(prune_modes);
953 let provider = factory.provider_rw().unwrap();
954 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
955 assert_matches!(provider.transaction_sender(0), Ok(None));
956 assert_matches!(
957 provider.transaction_id(*block.body().transactions[0].tx_hash()),
958 Ok(None)
959 );
960 }
961 }
962
963 #[test]
964 fn take_block_transaction_range_recover_senders() {
965 let mut rng = generators::rng();
966 let block =
967 random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
968
969 let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
970 for range in tx_ranges {
971 let factory = create_test_provider_factory();
972 let provider = factory.provider_rw().unwrap();
973
974 assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
975
976 let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
977 assert_eq!(
978 senders,
979 range
980 .clone()
981 .map(|tx_number| (
982 tx_number,
983 block.body().transactions[tx_number as usize].recover_signer().unwrap()
984 ))
985 .collect::<Vec<_>>()
986 );
987
988 let db_senders = provider.senders_by_tx_range(range);
989 assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
990 }
991 }
992
993 #[test]
994 fn header_sync_gap_lookup() {
995 let factory = create_test_provider_factory();
996 let provider = factory.provider_rw().unwrap();
997
998 let mut rng = generators::rng();
999
1000 let checkpoint = 0;
1002 let head = random_header(&mut rng, 0, None);
1003
1004 assert_matches!(
1006 provider.local_tip_header(checkpoint),
1007 Err(ProviderError::HeaderNotFound(block_number))
1008 if block_number.as_number().unwrap() == checkpoint
1009 );
1010
1011 let static_file_provider = provider.static_file_provider();
1013 let mut static_file_writer =
1014 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
1015 static_file_writer.append_header(head.header(), &head.hash()).unwrap();
1016 static_file_writer.commit().unwrap();
1017 drop(static_file_writer);
1018
1019 let local_head = provider.local_tip_header(checkpoint).unwrap();
1020
1021 assert_eq!(local_head, head);
1022 }
1023}