Skip to main content

reth_provider/providers/database/
mod.rs

1use crate::{
2    providers::{
3        state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4        StaticFileProvider, StaticFileProviderRWRefMut,
5    },
6    to_range,
7    traits::{BlockSource, ReceiptProvider},
8    BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
9    EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
10    MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
11    StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
12    TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use parking_lot::RwLock;
19use reth_chainspec::ChainInfo;
20use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
21use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
22use reth_errors::{RethError, RethResult};
23use reth_node_types::{
24    BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
25};
26use reth_primitives_traits::{RecoveredBlock, SealedHeader};
27use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
28use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
29use reth_static_file_types::StaticFileSegment;
30use reth_storage_api::{
31    BlockBodyIndicesProvider, NodePrimitivesProvider, StorageSettings, StorageSettingsCache,
32    TryIntoHistoricalStateProvider,
33};
34use reth_storage_errors::provider::ProviderResult;
35use reth_trie::HashedPostState;
36use reth_trie_db::ChangesetCache;
37use revm_database::BundleState;
38use std::{
39    ops::{RangeBounds, RangeInclusive},
40    path::Path,
41    sync::Arc,
42};
43use tracing::{instrument, trace};
44
45mod provider;
46pub use provider::{
47    CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
48};
49
50use super::ProviderNodeTypes;
51use reth_trie::KeccakKeyHasher;
52
53mod builder;
54pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
55
56mod metrics;
57
58mod chain;
59pub use chain::*;
60
61/// A common provider that fetches data from a database or static file.
62///
63/// This provider implements most provider or provider factory traits.
64pub struct ProviderFactory<N: NodeTypesWithDB> {
65    /// Database instance
66    db: N::DB,
67    /// Chain spec
68    chain_spec: Arc<N::ChainSpec>,
69    /// Static File Provider
70    static_file_provider: StaticFileProvider<N::Primitives>,
71    /// Optional pruning configuration
72    prune_modes: PruneModes,
73    /// The node storage handler.
74    storage: Arc<N::Storage>,
75    /// Storage configuration settings for this node
76    storage_settings: Arc<RwLock<StorageSettings>>,
77    /// `RocksDB` provider
78    rocksdb_provider: RocksDBProvider,
79    /// Changeset cache for trie unwinding
80    changeset_cache: ChangesetCache,
81    /// Task runtime for spawning parallel I/O work.
82    runtime: reth_tasks::Runtime,
83}
84
85impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
86    /// Instantiates the builder for this type
87    pub fn builder() -> ProviderFactoryBuilder<N> {
88        ProviderFactoryBuilder::default()
89    }
90}
91
92impl<N: ProviderNodeTypes> ProviderFactory<N> {
93    /// Create new database provider factory.
94    ///
95    /// The storage backends used by the produced factory MAY be inconsistent.
96    /// It is recommended to call [`Self::check_consistency`] after
97    /// creation to ensure consistency between the database and static files.
98    /// If the function returns unwind targets, the caller MUST unwind the
99    /// inner database to the minimum of the two targets to ensure consistency.
100    pub fn new(
101        db: N::DB,
102        chain_spec: Arc<N::ChainSpec>,
103        static_file_provider: StaticFileProvider<N::Primitives>,
104        rocksdb_provider: RocksDBProvider,
105        runtime: reth_tasks::Runtime,
106    ) -> ProviderResult<Self> {
107        // Load storage settings from database at init time. Creates a temporary provider
108        // to read persisted settings, falling back to legacy defaults if none exist.
109        //
110        // Both factory and all providers it creates should share these cached settings.
111        let legacy_settings = StorageSettings::v1();
112        let storage_settings = DatabaseProvider::<_, N>::new(
113            db.tx()?,
114            chain_spec.clone(),
115            static_file_provider.clone(),
116            Default::default(),
117            Default::default(),
118            Arc::new(RwLock::new(legacy_settings)),
119            rocksdb_provider.clone(),
120            ChangesetCache::new(),
121            runtime.clone(),
122        )
123        .storage_settings()?
124        .unwrap_or(legacy_settings);
125
126        Ok(Self {
127            db,
128            chain_spec,
129            static_file_provider,
130            prune_modes: PruneModes::default(),
131            storage: Default::default(),
132            storage_settings: Arc::new(RwLock::new(storage_settings)),
133            rocksdb_provider,
134            changeset_cache: ChangesetCache::new(),
135            runtime,
136        })
137    }
138
139    /// Create new database provider factory and perform consistency checks.
140    ///
141    /// This will call [`Self::check_consistency`] internally and return
142    /// [`ProviderError::MustUnwind`] if inconsistencies are found. It may also
143    /// return any [`ProviderError`] that [`Self::new`] may return, or that are
144    /// encountered during consistency checks.
145    pub fn new_checked(
146        db: N::DB,
147        chain_spec: Arc<N::ChainSpec>,
148        static_file_provider: StaticFileProvider<N::Primitives>,
149        rocksdb_provider: RocksDBProvider,
150        runtime: reth_tasks::Runtime,
151    ) -> ProviderResult<Self> {
152        Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
153            .and_then(Self::assert_consistent)
154    }
155}
156
157impl<N: NodeTypesWithDB> ProviderFactory<N> {
158    /// Sets the pruning configuration for an existing [`ProviderFactory`].
159    pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
160        self.prune_modes = prune_modes;
161        self
162    }
163
164    /// Sets the changeset cache for an existing [`ProviderFactory`].
165    pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
166        self.changeset_cache = changeset_cache;
167        self
168    }
169
170    /// Returns reference to the underlying database.
171    pub const fn db_ref(&self) -> &N::DB {
172        &self.db
173    }
174
175    #[cfg(any(test, feature = "test-utils"))]
176    /// Consumes Self and returns DB
177    pub fn into_db(self) -> N::DB {
178        self.db
179    }
180}
181
182impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
183    fn cached_storage_settings(&self) -> StorageSettings {
184        *self.storage_settings.read()
185    }
186
187    fn set_storage_settings_cache(&self, settings: StorageSettings) {
188        *self.storage_settings.write() = settings;
189    }
190}
191
192impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
193    fn rocksdb_provider(&self) -> RocksDBProvider {
194        self.rocksdb_provider.clone()
195    }
196
197    #[cfg(all(unix, feature = "rocksdb"))]
198    fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
199        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
200    }
201
202    #[cfg(all(unix, feature = "rocksdb"))]
203    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
204        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
205    }
206}
207
208impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
209    /// Create new database provider by passing a path. [`ProviderFactory`] will own the database
210    /// instance.
211    pub fn new_with_database_path<P: AsRef<Path>>(
212        path: P,
213        chain_spec: Arc<N::ChainSpec>,
214        args: DatabaseArguments,
215        static_file_provider: StaticFileProvider<N::Primitives>,
216        rocksdb_provider: RocksDBProvider,
217        runtime: reth_tasks::Runtime,
218    ) -> RethResult<Self> {
219        Self::new(
220            init_db(path, args).map_err(RethError::msg)?,
221            chain_spec,
222            static_file_provider,
223            rocksdb_provider,
224            runtime,
225        )
226        .map_err(RethError::Provider)
227    }
228}
229
230impl<N: ProviderNodeTypes> ProviderFactory<N> {
231    /// Returns a provider with a created `DbTx` inside, which allows fetching data from the
232    /// database using different types of providers. Example: [`HeaderProvider`]
233    /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
234    ///
235    /// This sets the [`PruneModes`] to [`None`], because they should only be relevant for writing
236    /// data.
237    #[track_caller]
238    pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
239        Ok(DatabaseProvider::new(
240            self.db.tx()?,
241            self.chain_spec.clone(),
242            self.static_file_provider.clone(),
243            self.prune_modes.clone(),
244            self.storage.clone(),
245            self.storage_settings.clone(),
246            self.rocksdb_provider.clone(),
247            self.changeset_cache.clone(),
248            self.runtime.clone(),
249        ))
250    }
251
252    /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
253    /// data from the database using different types of providers. Example: [`HeaderProvider`]
254    /// [`BlockHashReader`].  This may fail if the inner read/write database transaction fails to
255    /// open.
256    #[track_caller]
257    pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
258        Ok(DatabaseProviderRW(DatabaseProvider::new_rw(
259            self.db.tx_mut()?,
260            self.chain_spec.clone(),
261            self.static_file_provider.clone(),
262            self.prune_modes.clone(),
263            self.storage.clone(),
264            self.storage_settings.clone(),
265            self.rocksdb_provider.clone(),
266            self.changeset_cache.clone(),
267            self.runtime.clone(),
268        )))
269    }
270
271    /// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
272    /// Uses unwind commit order (MDBX first, then `RocksDB`, then static files) to allow
273    /// recovery by truncating static files on restart if interrupted.
274    #[track_caller]
275    pub fn unwind_provider_rw(
276        &self,
277    ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
278        Ok(DatabaseProvider::new_unwind_rw(
279            self.db.tx_mut()?,
280            self.chain_spec.clone(),
281            self.static_file_provider.clone(),
282            self.prune_modes.clone(),
283            self.storage.clone(),
284            self.storage_settings.clone(),
285            self.rocksdb_provider.clone(),
286            self.changeset_cache.clone(),
287            self.runtime.clone(),
288        ))
289    }
290
291    /// State provider for latest block
292    #[track_caller]
293    pub fn latest(&self) -> ProviderResult<StateProviderBox> {
294        trace!(target: "providers::db", "Returning latest state provider");
295        Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
296    }
297
298    /// Storage provider for state at that given block
299    pub fn history_by_block_number(
300        &self,
301        block_number: BlockNumber,
302    ) -> ProviderResult<StateProviderBox> {
303        let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
304        trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
305        Ok(state_provider)
306    }
307
308    /// Storage provider for state at that given block hash
309    pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
310        let provider = self.provider()?;
311
312        let block_number = provider
313            .block_number(block_hash)?
314            .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
315
316        let state_provider = provider.try_into_history_at_block(block_number)?;
317        trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
318        Ok(state_provider)
319    }
320
321    /// Asserts that the static files and database are consistent. If not,
322    /// returns [`ProviderError::MustUnwind`] with the appropriate unwind
323    /// target. May also return any [`ProviderError`] that
324    /// [`Self::check_consistency`] may return.
325    pub fn assert_consistent(self) -> ProviderResult<Self> {
326        let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
327
328        let source = match (rocksdb_unwind, static_file_unwind) {
329            (None, None) => return Ok(self),
330            (Some(_), Some(_)) => "RocksDB and Static Files",
331            (Some(_), None) => "RocksDB",
332            (None, Some(_)) => "Static Files",
333        };
334
335        Err(ProviderError::MustUnwind {
336            data_source: source,
337            unwind_to: rocksdb_unwind
338                .into_iter()
339                .chain(static_file_unwind)
340                .min()
341                .expect("at least one unwind target must be Some"),
342        })
343    }
344
345    /// Checks the consistency between the static files and the database. This
346    /// may result in static files being pruned or otherwise healed to ensure
347    /// consistency. I.e. this MAY result in writes to the static files.
348    #[instrument(err, skip(self))]
349    pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
350        let provider_ro = self.database_provider_ro()?;
351
352        // Step 1: heal file-level inconsistencies (no pruning)
353        self.static_file_provider().check_file_consistency(&provider_ro)?;
354
355        // Step 2: RocksDB consistency check (needs static files tx data)
356        let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
357
358        // Step 3: Static file checkpoint consistency (may prune)
359        let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
360            |target| match target {
361                PipelineTarget::Unwind(block) => block,
362                PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
363            },
364        );
365
366        Ok((rocksdb_unwind, static_file_unwind))
367    }
368}
369
370impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
371    type Primitives = N::Primitives;
372}
373
374impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
375    type DB = N::DB;
376    type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
377    type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
378
379    fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
380        self.provider()
381    }
382
383    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
384        self.provider_rw().map(|provider| provider.0)
385    }
386}
387
388impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
389    /// Returns static file provider
390    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
391        self.static_file_provider.clone()
392    }
393
394    fn get_static_file_writer(
395        &self,
396        block: BlockNumber,
397        segment: StaticFileSegment,
398    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
399        self.static_file_provider.get_writer(block, segment)
400    }
401}
402
403impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
404    type Header = HeaderTy<N>;
405    fn local_tip_header(
406        &self,
407        highest_uninterrupted_block: BlockNumber,
408    ) -> ProviderResult<SealedHeader<Self::Header>> {
409        self.provider()?.local_tip_header(highest_uninterrupted_block)
410    }
411}
412
413impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
414    type Header = HeaderTy<N>;
415
416    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
417        self.provider()?.header(block_hash)
418    }
419
420    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
421        self.static_file_provider.header_by_number(num)
422    }
423
424    fn headers_range(
425        &self,
426        range: impl RangeBounds<BlockNumber>,
427    ) -> ProviderResult<Vec<Self::Header>> {
428        self.static_file_provider.headers_range(range)
429    }
430
431    fn sealed_header(
432        &self,
433        number: BlockNumber,
434    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
435        self.static_file_provider.sealed_header(number)
436    }
437
438    fn sealed_headers_range(
439        &self,
440        range: impl RangeBounds<BlockNumber>,
441    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
442        self.static_file_provider.sealed_headers_range(range)
443    }
444
445    fn sealed_headers_while(
446        &self,
447        range: impl RangeBounds<BlockNumber>,
448        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
449    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
450        self.static_file_provider.sealed_headers_while(range, predicate)
451    }
452}
453
454impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
455    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
456        self.static_file_provider.block_hash(number)
457    }
458
459    fn canonical_hashes_range(
460        &self,
461        start: BlockNumber,
462        end: BlockNumber,
463    ) -> ProviderResult<Vec<B256>> {
464        self.static_file_provider.canonical_hashes_range(start, end)
465    }
466}
467
468impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
469    fn chain_info(&self) -> ProviderResult<ChainInfo> {
470        self.provider()?.chain_info()
471    }
472
473    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
474        self.provider()?.best_block_number()
475    }
476
477    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
478        self.static_file_provider.last_block_number()
479    }
480
481    fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
482        // earliest history height tracks the lowest block number that has __not__ been expired, in
483        // other words, the first/earliest available block.
484        Ok(self.static_file_provider.earliest_history_height())
485    }
486
487    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
488        self.provider()?.block_number(hash)
489    }
490}
491
492impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
493    type Block = BlockTy<N>;
494
495    fn find_block_by_hash(
496        &self,
497        hash: B256,
498        source: BlockSource,
499    ) -> ProviderResult<Option<Self::Block>> {
500        self.provider()?.find_block_by_hash(hash, source)
501    }
502
503    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
504        self.provider()?.block(id)
505    }
506
507    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
508        self.provider()?.pending_block()
509    }
510
511    fn pending_block_and_receipts(
512        &self,
513    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
514        self.provider()?.pending_block_and_receipts()
515    }
516
517    fn recovered_block(
518        &self,
519        id: BlockHashOrNumber,
520        transaction_kind: TransactionVariant,
521    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
522        self.provider()?.recovered_block(id, transaction_kind)
523    }
524
525    fn sealed_block_with_senders(
526        &self,
527        id: BlockHashOrNumber,
528        transaction_kind: TransactionVariant,
529    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
530        self.provider()?.sealed_block_with_senders(id, transaction_kind)
531    }
532
533    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
534        self.provider()?.block_range(range)
535    }
536
537    fn block_with_senders_range(
538        &self,
539        range: RangeInclusive<BlockNumber>,
540    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
541        self.provider()?.block_with_senders_range(range)
542    }
543
544    fn recovered_block_range(
545        &self,
546        range: RangeInclusive<BlockNumber>,
547    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
548        self.provider()?.recovered_block_range(range)
549    }
550
551    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
552        self.provider()?.block_by_transaction_id(id)
553    }
554}
555
556impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
557    type Transaction = TxTy<N>;
558
559    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
560        self.provider()?.transaction_id(tx_hash)
561    }
562
563    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
564        self.static_file_provider.transaction_by_id(id)
565    }
566
567    fn transaction_by_id_unhashed(
568        &self,
569        id: TxNumber,
570    ) -> ProviderResult<Option<Self::Transaction>> {
571        self.static_file_provider.transaction_by_id_unhashed(id)
572    }
573
574    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
575        self.provider()?.transaction_by_hash(hash)
576    }
577
578    fn transaction_by_hash_with_meta(
579        &self,
580        tx_hash: TxHash,
581    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
582        self.provider()?.transaction_by_hash_with_meta(tx_hash)
583    }
584
585    fn transactions_by_block(
586        &self,
587        id: BlockHashOrNumber,
588    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
589        self.provider()?.transactions_by_block(id)
590    }
591
592    fn transactions_by_block_range(
593        &self,
594        range: impl RangeBounds<BlockNumber>,
595    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
596        self.provider()?.transactions_by_block_range(range)
597    }
598
599    fn transactions_by_tx_range(
600        &self,
601        range: impl RangeBounds<TxNumber>,
602    ) -> ProviderResult<Vec<Self::Transaction>> {
603        self.static_file_provider.transactions_by_tx_range(range)
604    }
605
606    fn senders_by_tx_range(
607        &self,
608        range: impl RangeBounds<TxNumber>,
609    ) -> ProviderResult<Vec<Address>> {
610        if EitherWriterDestination::senders(self).is_static_file() {
611            self.static_file_provider.senders_by_tx_range(range)
612        } else {
613            self.provider()?.senders_by_tx_range(range)
614        }
615    }
616
617    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
618        if EitherWriterDestination::senders(self).is_static_file() {
619            self.static_file_provider.transaction_sender(id)
620        } else {
621            self.provider()?.transaction_sender(id)
622        }
623    }
624}
625
626impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
627    type Receipt = ReceiptTy<N>;
628
629    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
630        self.static_file_provider.get_with_static_file_or_database(
631            StaticFileSegment::Receipts,
632            id,
633            |static_file| static_file.receipt(id),
634            || self.provider()?.receipt(id),
635        )
636    }
637
638    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
639        self.provider()?.receipt_by_hash(hash)
640    }
641
642    fn receipts_by_block(
643        &self,
644        block: BlockHashOrNumber,
645    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
646        self.provider()?.receipts_by_block(block)
647    }
648
649    fn receipts_by_tx_range(
650        &self,
651        range: impl RangeBounds<TxNumber>,
652    ) -> ProviderResult<Vec<Self::Receipt>> {
653        self.static_file_provider.get_range_with_static_file_or_database(
654            StaticFileSegment::Receipts,
655            to_range(range),
656            |static_file, range, _| static_file.receipts_by_tx_range(range),
657            |range, _| self.provider()?.receipts_by_tx_range(range),
658            |_| true,
659        )
660    }
661
662    fn receipts_by_block_range(
663        &self,
664        block_range: RangeInclusive<BlockNumber>,
665    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
666        self.provider()?.receipts_by_block_range(block_range)
667    }
668}
669
670impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
671    fn block_body_indices(
672        &self,
673        number: BlockNumber,
674    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
675        self.provider()?.block_body_indices(number)
676    }
677
678    fn block_body_indices_range(
679        &self,
680        range: RangeInclusive<BlockNumber>,
681    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
682        self.provider()?.block_body_indices_range(range)
683    }
684}
685
686impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
687    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
688        self.provider()?.get_stage_checkpoint(id)
689    }
690
691    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
692        self.provider()?.get_stage_checkpoint_progress(id)
693    }
694    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
695        self.provider()?.get_all_checkpoints()
696    }
697}
698
699impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
700    type ChainSpec = N::ChainSpec;
701
702    fn chain_spec(&self) -> Arc<N::ChainSpec> {
703        self.chain_spec.clone()
704    }
705}
706
707impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
708    fn get_prune_checkpoint(
709        &self,
710        segment: PruneSegment,
711    ) -> ProviderResult<Option<PruneCheckpoint>> {
712        self.provider()?.get_prune_checkpoint(segment)
713    }
714
715    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
716        self.provider()?.get_prune_checkpoints()
717    }
718}
719
720impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
721    fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
722        HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
723    }
724}
725
726impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
727    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
728        self.provider()?.get_metadata(key)
729    }
730}
731
732impl<N> fmt::Debug for ProviderFactory<N>
733where
734    N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
735{
736    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
737        let Self {
738            db,
739            chain_spec,
740            static_file_provider,
741            prune_modes,
742            storage,
743            storage_settings,
744            rocksdb_provider,
745            changeset_cache,
746            runtime,
747        } = self;
748        f.debug_struct("ProviderFactory")
749            .field("db", &db)
750            .field("chain_spec", &chain_spec)
751            .field("static_file_provider", &static_file_provider)
752            .field("prune_modes", &prune_modes)
753            .field("storage", &storage)
754            .field("storage_settings", &*storage_settings.read())
755            .field("rocksdb_provider", &rocksdb_provider)
756            .field("changeset_cache", &changeset_cache)
757            .field("runtime", &runtime)
758            .finish()
759    }
760}
761
762impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
763    fn clone(&self) -> Self {
764        Self {
765            db: self.db.clone(),
766            chain_spec: self.chain_spec.clone(),
767            static_file_provider: self.static_file_provider.clone(),
768            prune_modes: self.prune_modes.clone(),
769            storage: self.storage.clone(),
770            storage_settings: self.storage_settings.clone(),
771            rocksdb_provider: self.rocksdb_provider.clone(),
772            changeset_cache: self.changeset_cache.clone(),
773            runtime: self.runtime.clone(),
774        }
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781    use crate::{
782        providers::{StaticFileProvider, StaticFileWriter},
783        test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
784        BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
785        TransactionsProvider,
786    };
787    use alloy_primitives::{TxNumber, B256};
788    use assert_matches::assert_matches;
789    use reth_chainspec::ChainSpecBuilder;
790    use reth_db::{
791        mdbx::DatabaseArguments,
792        test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
793    };
794    use reth_db_api::tables;
795    use reth_primitives_traits::SignerRecoverable;
796    use reth_prune_types::{PruneMode, PruneModes};
797    use reth_storage_errors::provider::ProviderError;
798    use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
799    use std::{ops::RangeInclusive, sync::Arc};
800
801    #[test]
802    fn common_history_provider() {
803        let factory = create_test_provider_factory();
804        let _ = factory.latest();
805    }
806
807    #[test]
808    fn default_chain_info() {
809        let factory = create_test_provider_factory();
810        let provider = factory.provider().unwrap();
811
812        let chain_info = provider.chain_info().expect("should be ok");
813        assert_eq!(chain_info.best_number, 0);
814        assert_eq!(chain_info.best_hash, B256::ZERO);
815    }
816
817    #[test]
818    fn provider_flow() {
819        let factory = create_test_provider_factory();
820        let provider = factory.provider().unwrap();
821        provider.block_hash(0).unwrap();
822        let provider_rw = factory.provider_rw().unwrap();
823        provider_rw.block_hash(0).unwrap();
824        provider.block_hash(0).unwrap();
825    }
826
827    #[test]
828    fn provider_factory_with_database_path() {
829        let chain_spec = ChainSpecBuilder::mainnet().build();
830        let (_static_dir, static_dir_path) = create_test_static_files_dir();
831        let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
832        let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
833        let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
834            _db_tempdir.path(),
835            Arc::new(chain_spec),
836            DatabaseArguments::new(Default::default()),
837            StaticFileProvider::read_write(static_dir_path).unwrap(),
838            RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
839            reth_tasks::Runtime::test(),
840        )
841        .unwrap();
842        let provider = factory.provider().unwrap();
843        provider.block_hash(0).unwrap();
844        let provider_rw = factory.provider_rw().unwrap();
845        provider_rw.block_hash(0).unwrap();
846        provider.block_hash(0).unwrap();
847    }
848
849    #[test]
850    fn insert_block_with_prune_modes() {
851        let block = TEST_BLOCK.clone();
852
853        {
854            let factory = create_test_provider_factory();
855            let provider = factory.provider_rw().unwrap();
856            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
857            assert_matches!(
858                provider.transaction_sender(0), Ok(Some(sender))
859                if sender == block.body().transactions[0].recover_signer().unwrap()
860            );
861            assert_matches!(
862                provider.transaction_id(*block.body().transactions[0].tx_hash()),
863                Ok(Some(0))
864            );
865        }
866
867        {
868            let prune_modes = PruneModes {
869                sender_recovery: Some(PruneMode::Full),
870                transaction_lookup: Some(PruneMode::Full),
871                ..PruneModes::default()
872            };
873            // Keep factory alive until provider is dropped to prevent TempDatabase cleanup
874            let factory = create_test_provider_factory().with_prune_modes(prune_modes);
875            let provider = factory.provider_rw().unwrap();
876            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
877            assert_matches!(provider.transaction_sender(0), Ok(None));
878            assert_matches!(
879                provider.transaction_id(*block.body().transactions[0].tx_hash()),
880                Ok(None)
881            );
882        }
883    }
884
885    #[test]
886    fn take_block_transaction_range_recover_senders() {
887        let mut rng = generators::rng();
888        let block =
889            random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
890
891        let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
892        for range in tx_ranges {
893            let factory = create_test_provider_factory();
894            let provider = factory.provider_rw().unwrap();
895
896            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
897
898            let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
899            assert_eq!(
900                senders,
901                range
902                    .clone()
903                    .map(|tx_number| (
904                        tx_number,
905                        block.body().transactions[tx_number as usize].recover_signer().unwrap()
906                    ))
907                    .collect::<Vec<_>>()
908            );
909
910            let db_senders = provider.senders_by_tx_range(range);
911            assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
912        }
913    }
914
915    #[test]
916    fn header_sync_gap_lookup() {
917        let factory = create_test_provider_factory();
918        let provider = factory.provider_rw().unwrap();
919
920        let mut rng = generators::rng();
921
922        // Genesis
923        let checkpoint = 0;
924        let head = random_header(&mut rng, 0, None);
925
926        // Empty database
927        assert_matches!(
928            provider.local_tip_header(checkpoint),
929            Err(ProviderError::HeaderNotFound(block_number))
930                if block_number.as_number().unwrap() == checkpoint
931        );
932
933        // Checkpoint and no gap
934        let static_file_provider = provider.static_file_provider();
935        let mut static_file_writer =
936            static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
937        static_file_writer.append_header(head.header(), &head.hash()).unwrap();
938        static_file_writer.commit().unwrap();
939        drop(static_file_writer);
940
941        let local_head = provider.local_tip_header(checkpoint).unwrap();
942
943        assert_eq!(local_head, head);
944    }
945}