Skip to main content

reth_provider/providers/database/
mod.rs

1use crate::{
2    providers::{
3        state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4        StaticFileProvider, StaticFileProviderRWRefMut,
5    },
6    to_range,
7    traits::{BlockSource, ReceiptProvider},
8    BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
9    EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
10    MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
11    StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
12    TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use parking_lot::RwLock;
19use reth_chainspec::ChainInfo;
20use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
21use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
22use reth_errors::{RethError, RethResult};
23use reth_node_types::{
24    BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
25};
26use reth_primitives_traits::{RecoveredBlock, SealedHeader};
27use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
28use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
29use reth_static_file_types::StaticFileSegment;
30use reth_storage_api::{
31    BlockBodyIndicesProvider, NodePrimitivesProvider, StorageSettings, StorageSettingsCache,
32    TryIntoHistoricalStateProvider,
33};
34use reth_storage_errors::provider::ProviderResult;
35use reth_trie::HashedPostState;
36use reth_trie_db::ChangesetCache;
37use revm_database::BundleState;
38use std::{
39    ops::{RangeBounds, RangeInclusive},
40    path::Path,
41    sync::Arc,
42};
43use tracing::{instrument, trace};
44
45mod provider;
46pub use provider::{
47    CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
48};
49
50use super::ProviderNodeTypes;
51use reth_trie::KeccakKeyHasher;
52
53mod builder;
54pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
55
56mod metrics;
57
58mod chain;
59pub use chain::*;
60
61/// A common provider that fetches data from a database or static file.
62///
63/// This provider implements most provider or provider factory traits.
64pub struct ProviderFactory<N: NodeTypesWithDB> {
65    /// Database instance
66    db: N::DB,
67    /// Chain spec
68    chain_spec: Arc<N::ChainSpec>,
69    /// Static File Provider
70    static_file_provider: StaticFileProvider<N::Primitives>,
71    /// Optional pruning configuration
72    prune_modes: PruneModes,
73    /// The node storage handler.
74    storage: Arc<N::Storage>,
75    /// Storage configuration settings for this node
76    storage_settings: Arc<RwLock<StorageSettings>>,
77    /// `RocksDB` provider
78    rocksdb_provider: RocksDBProvider,
79    /// Changeset cache for trie unwinding
80    changeset_cache: ChangesetCache,
81    /// Task runtime for spawning parallel I/O work.
82    runtime: reth_tasks::Runtime,
83}
84
85impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
86    /// Instantiates the builder for this type
87    pub fn builder() -> ProviderFactoryBuilder<N> {
88        ProviderFactoryBuilder::default()
89    }
90}
91
92impl<N: ProviderNodeTypes> ProviderFactory<N> {
93    /// Create new database provider factory.
94    ///
95    /// The storage backends used by the produced factory MAY be inconsistent.
96    /// It is recommended to call [`Self::check_consistency`] after
97    /// creation to ensure consistency between the database and static files.
98    /// If the function returns unwind targets, the caller MUST unwind the
99    /// inner database to the minimum of the two targets to ensure consistency.
100    pub fn new(
101        db: N::DB,
102        chain_spec: Arc<N::ChainSpec>,
103        static_file_provider: StaticFileProvider<N::Primitives>,
104        rocksdb_provider: RocksDBProvider,
105        runtime: reth_tasks::Runtime,
106    ) -> ProviderResult<Self> {
107        // Load storage settings from database at init time. Creates a temporary provider
108        // to read persisted settings, falling back to legacy defaults if none exist.
109        //
110        // Both factory and all providers it creates should share these cached settings.
111        let legacy_settings = StorageSettings::v1();
112        let storage_settings = DatabaseProvider::<_, N>::new(
113            db.tx()?,
114            chain_spec.clone(),
115            static_file_provider.clone(),
116            Default::default(),
117            Default::default(),
118            Arc::new(RwLock::new(legacy_settings)),
119            rocksdb_provider.clone(),
120            ChangesetCache::new(),
121            runtime.clone(),
122            db.path(),
123        )
124        .storage_settings()?
125        .unwrap_or(legacy_settings);
126
127        Ok(Self {
128            db,
129            chain_spec,
130            static_file_provider,
131            prune_modes: PruneModes::default(),
132            storage: Default::default(),
133            storage_settings: Arc::new(RwLock::new(storage_settings)),
134            rocksdb_provider,
135            changeset_cache: ChangesetCache::new(),
136            runtime,
137        })
138    }
139
140    /// Create new database provider factory and perform consistency checks.
141    ///
142    /// This will call [`Self::check_consistency`] internally and return
143    /// [`ProviderError::MustUnwind`] if inconsistencies are found. It may also
144    /// return any [`ProviderError`] that [`Self::new`] may return, or that are
145    /// encountered during consistency checks.
146    pub fn new_checked(
147        db: N::DB,
148        chain_spec: Arc<N::ChainSpec>,
149        static_file_provider: StaticFileProvider<N::Primitives>,
150        rocksdb_provider: RocksDBProvider,
151        runtime: reth_tasks::Runtime,
152    ) -> ProviderResult<Self> {
153        Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
154            .and_then(Self::assert_consistent)
155    }
156}
157
158impl<N: NodeTypesWithDB> ProviderFactory<N> {
159    /// Sets the pruning configuration for an existing [`ProviderFactory`].
160    pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
161        self.prune_modes = prune_modes;
162        self
163    }
164
165    /// Sets the changeset cache for an existing [`ProviderFactory`].
166    pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
167        self.changeset_cache = changeset_cache;
168        self
169    }
170
171    /// Returns reference to the underlying database.
172    pub const fn db_ref(&self) -> &N::DB {
173        &self.db
174    }
175
176    #[cfg(any(test, feature = "test-utils"))]
177    /// Consumes Self and returns DB
178    pub fn into_db(self) -> N::DB {
179        self.db
180    }
181}
182
183impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
184    fn cached_storage_settings(&self) -> StorageSettings {
185        *self.storage_settings.read()
186    }
187
188    fn set_storage_settings_cache(&self, settings: StorageSettings) {
189        *self.storage_settings.write() = settings;
190    }
191}
192
193impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
194    fn rocksdb_provider(&self) -> RocksDBProvider {
195        self.rocksdb_provider.clone()
196    }
197
198    #[cfg(all(unix, feature = "rocksdb"))]
199    fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
200        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
201    }
202
203    #[cfg(all(unix, feature = "rocksdb"))]
204    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
205        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
206    }
207}
208
209impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
210    /// Create new database provider by passing a path. [`ProviderFactory`] will own the database
211    /// instance.
212    pub fn new_with_database_path<P: AsRef<Path>>(
213        path: P,
214        chain_spec: Arc<N::ChainSpec>,
215        args: DatabaseArguments,
216        static_file_provider: StaticFileProvider<N::Primitives>,
217        rocksdb_provider: RocksDBProvider,
218        runtime: reth_tasks::Runtime,
219    ) -> RethResult<Self> {
220        Self::new(
221            init_db(path, args).map_err(RethError::msg)?,
222            chain_spec,
223            static_file_provider,
224            rocksdb_provider,
225            runtime,
226        )
227        .map_err(RethError::Provider)
228    }
229}
230
231impl<N: ProviderNodeTypes> ProviderFactory<N> {
232    /// Returns a provider with a created `DbTx` inside, which allows fetching data from the
233    /// database using different types of providers. Example: [`HeaderProvider`]
234    /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
235    ///
236    /// This sets the [`PruneModes`] to [`None`], because they should only be relevant for writing
237    /// data.
238    #[track_caller]
239    pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
240        Ok(DatabaseProvider::new(
241            self.db.tx()?,
242            self.chain_spec.clone(),
243            self.static_file_provider.clone(),
244            self.prune_modes.clone(),
245            self.storage.clone(),
246            self.storage_settings.clone(),
247            self.rocksdb_provider.clone(),
248            self.changeset_cache.clone(),
249            self.runtime.clone(),
250            self.db.path(),
251        ))
252    }
253
254    /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
255    /// data from the database using different types of providers. Example: [`HeaderProvider`]
256    /// [`BlockHashReader`].  This may fail if the inner read/write database transaction fails to
257    /// open.
258    #[track_caller]
259    pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
260        Ok(DatabaseProviderRW(DatabaseProvider::new_rw(
261            self.db.tx_mut()?,
262            self.chain_spec.clone(),
263            self.static_file_provider.clone(),
264            self.prune_modes.clone(),
265            self.storage.clone(),
266            self.storage_settings.clone(),
267            self.rocksdb_provider.clone(),
268            self.changeset_cache.clone(),
269            self.runtime.clone(),
270            self.db.path(),
271        )))
272    }
273
274    /// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
275    /// Uses unwind commit order (MDBX first, then `RocksDB`, then static files) to allow
276    /// recovery by truncating static files on restart if interrupted.
277    #[track_caller]
278    pub fn unwind_provider_rw(
279        &self,
280    ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
281        Ok(DatabaseProvider::new_unwind_rw(
282            self.db.tx_mut()?,
283            self.chain_spec.clone(),
284            self.static_file_provider.clone(),
285            self.prune_modes.clone(),
286            self.storage.clone(),
287            self.storage_settings.clone(),
288            self.rocksdb_provider.clone(),
289            self.changeset_cache.clone(),
290            self.runtime.clone(),
291            self.db.path(),
292        ))
293    }
294
295    /// State provider for latest block
296    #[track_caller]
297    pub fn latest(&self) -> ProviderResult<StateProviderBox> {
298        trace!(target: "providers::db", "Returning latest state provider");
299        Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
300    }
301
302    /// Storage provider for state at that given block
303    pub fn history_by_block_number(
304        &self,
305        block_number: BlockNumber,
306    ) -> ProviderResult<StateProviderBox> {
307        let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
308        trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
309        Ok(state_provider)
310    }
311
312    /// Storage provider for state at that given block hash
313    pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
314        let provider = self.provider()?;
315
316        let block_number = provider
317            .block_number(block_hash)?
318            .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
319
320        let state_provider = provider.try_into_history_at_block(block_number)?;
321        trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
322        Ok(state_provider)
323    }
324
325    /// Asserts that the static files and database are consistent. If not,
326    /// returns [`ProviderError::MustUnwind`] with the appropriate unwind
327    /// target. May also return any [`ProviderError`] that
328    /// [`Self::check_consistency`] may return.
329    pub fn assert_consistent(self) -> ProviderResult<Self> {
330        let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
331
332        let source = match (rocksdb_unwind, static_file_unwind) {
333            (None, None) => return Ok(self),
334            (Some(_), Some(_)) => "RocksDB and Static Files",
335            (Some(_), None) => "RocksDB",
336            (None, Some(_)) => "Static Files",
337        };
338
339        Err(ProviderError::MustUnwind {
340            data_source: source,
341            unwind_to: rocksdb_unwind
342                .into_iter()
343                .chain(static_file_unwind)
344                .min()
345                .expect("at least one unwind target must be Some"),
346        })
347    }
348
349    /// Checks the consistency between the static files and the database. This
350    /// may result in static files being pruned or otherwise healed to ensure
351    /// consistency. I.e. this MAY result in writes to the static files.
352    #[instrument(err, skip(self))]
353    pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
354        let provider_ro = self.database_provider_ro()?;
355
356        // Step 1: heal file-level inconsistencies (no pruning)
357        self.static_file_provider().check_file_consistency(&provider_ro)?;
358
359        // Step 2: RocksDB consistency check (needs static files tx data)
360        let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
361
362        // Step 3: Static file checkpoint consistency (may prune)
363        let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
364            |target| match target {
365                PipelineTarget::Unwind(block) => block,
366                PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
367            },
368        );
369
370        Ok((rocksdb_unwind, static_file_unwind))
371    }
372}
373
374impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
375    type Primitives = N::Primitives;
376}
377
378impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
379    type DB = N::DB;
380    type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
381    type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
382
383    fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
384        self.provider()
385    }
386
387    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
388        self.provider_rw().map(|provider| provider.0)
389    }
390}
391
392impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
393    /// Returns static file provider
394    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
395        self.static_file_provider.clone()
396    }
397
398    fn get_static_file_writer(
399        &self,
400        block: BlockNumber,
401        segment: StaticFileSegment,
402    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
403        self.static_file_provider.get_writer(block, segment)
404    }
405}
406
407impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
408    type Header = HeaderTy<N>;
409    fn local_tip_header(
410        &self,
411        highest_uninterrupted_block: BlockNumber,
412    ) -> ProviderResult<SealedHeader<Self::Header>> {
413        self.provider()?.local_tip_header(highest_uninterrupted_block)
414    }
415}
416
417impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
418    type Header = HeaderTy<N>;
419
420    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
421        self.provider()?.header(block_hash)
422    }
423
424    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
425        self.static_file_provider.header_by_number(num)
426    }
427
428    fn headers_range(
429        &self,
430        range: impl RangeBounds<BlockNumber>,
431    ) -> ProviderResult<Vec<Self::Header>> {
432        self.static_file_provider.headers_range(range)
433    }
434
435    fn sealed_header(
436        &self,
437        number: BlockNumber,
438    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
439        self.static_file_provider.sealed_header(number)
440    }
441
442    fn sealed_headers_range(
443        &self,
444        range: impl RangeBounds<BlockNumber>,
445    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
446        self.static_file_provider.sealed_headers_range(range)
447    }
448
449    fn sealed_headers_while(
450        &self,
451        range: impl RangeBounds<BlockNumber>,
452        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
453    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
454        self.static_file_provider.sealed_headers_while(range, predicate)
455    }
456}
457
458impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
459    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
460        self.static_file_provider.block_hash(number)
461    }
462
463    fn canonical_hashes_range(
464        &self,
465        start: BlockNumber,
466        end: BlockNumber,
467    ) -> ProviderResult<Vec<B256>> {
468        self.static_file_provider.canonical_hashes_range(start, end)
469    }
470}
471
472impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
473    fn chain_info(&self) -> ProviderResult<ChainInfo> {
474        self.provider()?.chain_info()
475    }
476
477    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
478        self.provider()?.best_block_number()
479    }
480
481    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
482        self.static_file_provider.last_block_number()
483    }
484
485    fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
486        // earliest history height tracks the lowest block number that has __not__ been expired, in
487        // other words, the first/earliest available block.
488        Ok(self.static_file_provider.earliest_history_height())
489    }
490
491    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
492        self.provider()?.block_number(hash)
493    }
494}
495
496impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
497    type Block = BlockTy<N>;
498
499    fn find_block_by_hash(
500        &self,
501        hash: B256,
502        source: BlockSource,
503    ) -> ProviderResult<Option<Self::Block>> {
504        self.provider()?.find_block_by_hash(hash, source)
505    }
506
507    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
508        self.provider()?.block(id)
509    }
510
511    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
512        self.provider()?.pending_block()
513    }
514
515    fn pending_block_and_receipts(
516        &self,
517    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
518        self.provider()?.pending_block_and_receipts()
519    }
520
521    fn recovered_block(
522        &self,
523        id: BlockHashOrNumber,
524        transaction_kind: TransactionVariant,
525    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
526        self.provider()?.recovered_block(id, transaction_kind)
527    }
528
529    fn sealed_block_with_senders(
530        &self,
531        id: BlockHashOrNumber,
532        transaction_kind: TransactionVariant,
533    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
534        self.provider()?.sealed_block_with_senders(id, transaction_kind)
535    }
536
537    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
538        self.provider()?.block_range(range)
539    }
540
541    fn block_with_senders_range(
542        &self,
543        range: RangeInclusive<BlockNumber>,
544    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
545        self.provider()?.block_with_senders_range(range)
546    }
547
548    fn recovered_block_range(
549        &self,
550        range: RangeInclusive<BlockNumber>,
551    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
552        self.provider()?.recovered_block_range(range)
553    }
554
555    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
556        self.provider()?.block_by_transaction_id(id)
557    }
558}
559
560impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
561    type Transaction = TxTy<N>;
562
563    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
564        self.provider()?.transaction_id(tx_hash)
565    }
566
567    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
568        self.static_file_provider.transaction_by_id(id)
569    }
570
571    fn transaction_by_id_unhashed(
572        &self,
573        id: TxNumber,
574    ) -> ProviderResult<Option<Self::Transaction>> {
575        self.static_file_provider.transaction_by_id_unhashed(id)
576    }
577
578    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
579        self.provider()?.transaction_by_hash(hash)
580    }
581
582    fn transaction_by_hash_with_meta(
583        &self,
584        tx_hash: TxHash,
585    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
586        self.provider()?.transaction_by_hash_with_meta(tx_hash)
587    }
588
589    fn transactions_by_block(
590        &self,
591        id: BlockHashOrNumber,
592    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
593        self.provider()?.transactions_by_block(id)
594    }
595
596    fn transactions_by_block_range(
597        &self,
598        range: impl RangeBounds<BlockNumber>,
599    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
600        self.provider()?.transactions_by_block_range(range)
601    }
602
603    fn transactions_by_tx_range(
604        &self,
605        range: impl RangeBounds<TxNumber>,
606    ) -> ProviderResult<Vec<Self::Transaction>> {
607        self.static_file_provider.transactions_by_tx_range(range)
608    }
609
610    fn senders_by_tx_range(
611        &self,
612        range: impl RangeBounds<TxNumber>,
613    ) -> ProviderResult<Vec<Address>> {
614        if EitherWriterDestination::senders(self).is_static_file() {
615            self.static_file_provider.senders_by_tx_range(range)
616        } else {
617            self.provider()?.senders_by_tx_range(range)
618        }
619    }
620
621    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
622        if EitherWriterDestination::senders(self).is_static_file() {
623            self.static_file_provider.transaction_sender(id)
624        } else {
625            self.provider()?.transaction_sender(id)
626        }
627    }
628}
629
630impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
631    type Receipt = ReceiptTy<N>;
632
633    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
634        self.static_file_provider.get_with_static_file_or_database(
635            StaticFileSegment::Receipts,
636            id,
637            |static_file| static_file.receipt(id),
638            || self.provider()?.receipt(id),
639        )
640    }
641
642    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
643        self.provider()?.receipt_by_hash(hash)
644    }
645
646    fn receipts_by_block(
647        &self,
648        block: BlockHashOrNumber,
649    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
650        self.provider()?.receipts_by_block(block)
651    }
652
653    fn receipts_by_tx_range(
654        &self,
655        range: impl RangeBounds<TxNumber>,
656    ) -> ProviderResult<Vec<Self::Receipt>> {
657        self.static_file_provider.get_range_with_static_file_or_database(
658            StaticFileSegment::Receipts,
659            to_range(range),
660            |static_file, range, _| static_file.receipts_by_tx_range(range),
661            |range, _| self.provider()?.receipts_by_tx_range(range),
662            |_| true,
663        )
664    }
665
666    fn receipts_by_block_range(
667        &self,
668        block_range: RangeInclusive<BlockNumber>,
669    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
670        self.provider()?.receipts_by_block_range(block_range)
671    }
672}
673
674impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
675    fn block_body_indices(
676        &self,
677        number: BlockNumber,
678    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
679        self.provider()?.block_body_indices(number)
680    }
681
682    fn block_body_indices_range(
683        &self,
684        range: RangeInclusive<BlockNumber>,
685    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
686        self.provider()?.block_body_indices_range(range)
687    }
688}
689
690impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
691    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
692        self.provider()?.get_stage_checkpoint(id)
693    }
694
695    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
696        self.provider()?.get_stage_checkpoint_progress(id)
697    }
698    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
699        self.provider()?.get_all_checkpoints()
700    }
701}
702
703impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
704    type ChainSpec = N::ChainSpec;
705
706    fn chain_spec(&self) -> Arc<N::ChainSpec> {
707        self.chain_spec.clone()
708    }
709}
710
711impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
712    fn get_prune_checkpoint(
713        &self,
714        segment: PruneSegment,
715    ) -> ProviderResult<Option<PruneCheckpoint>> {
716        self.provider()?.get_prune_checkpoint(segment)
717    }
718
719    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
720        self.provider()?.get_prune_checkpoints()
721    }
722}
723
724impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
725    fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
726        HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
727    }
728}
729
730impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
731    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
732        self.provider()?.get_metadata(key)
733    }
734}
735
736impl<N> fmt::Debug for ProviderFactory<N>
737where
738    N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
739{
740    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
741        let Self {
742            db,
743            chain_spec,
744            static_file_provider,
745            prune_modes,
746            storage,
747            storage_settings,
748            rocksdb_provider,
749            changeset_cache,
750            runtime,
751        } = self;
752        f.debug_struct("ProviderFactory")
753            .field("db", &db)
754            .field("chain_spec", &chain_spec)
755            .field("static_file_provider", &static_file_provider)
756            .field("prune_modes", &prune_modes)
757            .field("storage", &storage)
758            .field("storage_settings", &*storage_settings.read())
759            .field("rocksdb_provider", &rocksdb_provider)
760            .field("changeset_cache", &changeset_cache)
761            .field("runtime", &runtime)
762            .finish()
763    }
764}
765
766impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
767    fn clone(&self) -> Self {
768        Self {
769            db: self.db.clone(),
770            chain_spec: self.chain_spec.clone(),
771            static_file_provider: self.static_file_provider.clone(),
772            prune_modes: self.prune_modes.clone(),
773            storage: self.storage.clone(),
774            storage_settings: self.storage_settings.clone(),
775            rocksdb_provider: self.rocksdb_provider.clone(),
776            changeset_cache: self.changeset_cache.clone(),
777            runtime: self.runtime.clone(),
778        }
779    }
780}
781
782#[cfg(test)]
783mod tests {
784    use super::*;
785    use crate::{
786        providers::{StaticFileProvider, StaticFileWriter},
787        test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
788        BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
789        TransactionsProvider,
790    };
791    use alloy_primitives::{TxNumber, B256};
792    use assert_matches::assert_matches;
793    use reth_chainspec::ChainSpecBuilder;
794    use reth_db::{
795        mdbx::DatabaseArguments,
796        test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
797    };
798    use reth_db_api::tables;
799    use reth_primitives_traits::SignerRecoverable;
800    use reth_prune_types::{PruneMode, PruneModes};
801    use reth_storage_errors::provider::ProviderError;
802    use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
803    use std::{ops::RangeInclusive, sync::Arc};
804
805    #[test]
806    fn common_history_provider() {
807        let factory = create_test_provider_factory();
808        let _ = factory.latest();
809    }
810
811    #[test]
812    fn default_chain_info() {
813        let factory = create_test_provider_factory();
814        let provider = factory.provider().unwrap();
815
816        let chain_info = provider.chain_info().expect("should be ok");
817        assert_eq!(chain_info.best_number, 0);
818        assert_eq!(chain_info.best_hash, B256::ZERO);
819    }
820
821    #[test]
822    fn provider_flow() {
823        let factory = create_test_provider_factory();
824        let provider = factory.provider().unwrap();
825        provider.block_hash(0).unwrap();
826        let provider_rw = factory.provider_rw().unwrap();
827        provider_rw.block_hash(0).unwrap();
828        provider.block_hash(0).unwrap();
829    }
830
831    #[test]
832    fn provider_factory_with_database_path() {
833        let chain_spec = ChainSpecBuilder::mainnet().build();
834        let (_static_dir, static_dir_path) = create_test_static_files_dir();
835        let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
836        let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
837        let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
838            _db_tempdir.path(),
839            Arc::new(chain_spec),
840            DatabaseArguments::new(Default::default()),
841            StaticFileProvider::read_write(static_dir_path).unwrap(),
842            RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
843            reth_tasks::Runtime::test(),
844        )
845        .unwrap();
846        let provider = factory.provider().unwrap();
847        provider.block_hash(0).unwrap();
848        let provider_rw = factory.provider_rw().unwrap();
849        provider_rw.block_hash(0).unwrap();
850        provider.block_hash(0).unwrap();
851    }
852
853    #[test]
854    fn insert_block_with_prune_modes() {
855        let block = TEST_BLOCK.clone();
856
857        {
858            let factory = create_test_provider_factory();
859            let provider = factory.provider_rw().unwrap();
860            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
861            assert_matches!(
862                provider.transaction_sender(0), Ok(Some(sender))
863                if sender == block.body().transactions[0].recover_signer().unwrap()
864            );
865            assert_matches!(
866                provider.transaction_id(*block.body().transactions[0].tx_hash()),
867                Ok(Some(0))
868            );
869        }
870
871        {
872            let prune_modes = PruneModes {
873                sender_recovery: Some(PruneMode::Full),
874                transaction_lookup: Some(PruneMode::Full),
875                ..PruneModes::default()
876            };
877            // Keep factory alive until provider is dropped to prevent TempDatabase cleanup
878            let factory = create_test_provider_factory().with_prune_modes(prune_modes);
879            let provider = factory.provider_rw().unwrap();
880            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
881            assert_matches!(provider.transaction_sender(0), Ok(None));
882            assert_matches!(
883                provider.transaction_id(*block.body().transactions[0].tx_hash()),
884                Ok(None)
885            );
886        }
887    }
888
889    #[test]
890    fn take_block_transaction_range_recover_senders() {
891        let mut rng = generators::rng();
892        let block =
893            random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
894
895        let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
896        for range in tx_ranges {
897            let factory = create_test_provider_factory();
898            let provider = factory.provider_rw().unwrap();
899
900            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
901
902            let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
903            assert_eq!(
904                senders,
905                range
906                    .clone()
907                    .map(|tx_number| (
908                        tx_number,
909                        block.body().transactions[tx_number as usize].recover_signer().unwrap()
910                    ))
911                    .collect::<Vec<_>>()
912            );
913
914            let db_senders = provider.senders_by_tx_range(range);
915            assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
916        }
917    }
918
919    #[test]
920    fn header_sync_gap_lookup() {
921        let factory = create_test_provider_factory();
922        let provider = factory.provider_rw().unwrap();
923
924        let mut rng = generators::rng();
925
926        // Genesis
927        let checkpoint = 0;
928        let head = random_header(&mut rng, 0, None);
929
930        // Empty database
931        assert_matches!(
932            provider.local_tip_header(checkpoint),
933            Err(ProviderError::HeaderNotFound(block_number))
934                if block_number.as_number().unwrap() == checkpoint
935        );
936
937        // Checkpoint and no gap
938        let static_file_provider = provider.static_file_provider();
939        let mut static_file_writer =
940            static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
941        static_file_writer.append_header(head.header(), &head.hash()).unwrap();
942        static_file_writer.commit().unwrap();
943        drop(static_file_writer);
944
945        let local_head = provider.local_tip_header(checkpoint).unwrap();
946
947        assert_eq!(local_head, head);
948    }
949}