Skip to main content

reth_provider/providers/database/
mod.rs

1use crate::{
2    providers::{
3        state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4        StaticFileProvider, StaticFileProviderRWRefMut,
5    },
6    to_range,
7    traits::{BlockSource, ReceiptProvider},
8    BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
9    EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
10    MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
11    StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
12    TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use parking_lot::RwLock;
19use reth_chainspec::ChainInfo;
20use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
21use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
22use reth_errors::{RethError, RethResult};
23use reth_node_types::{
24    BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
25};
26use reth_primitives_traits::{RecoveredBlock, SealedHeader};
27use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
28use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
29use reth_static_file_types::StaticFileSegment;
30use reth_storage_api::{
31    BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
32    NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
33};
34use reth_storage_errors::provider::ProviderResult;
35use reth_trie::HashedPostState;
36use reth_trie_db::ChangesetCache;
37use revm_database::BundleState;
38use std::{
39    ops::{RangeBounds, RangeInclusive},
40    path::Path,
41    sync::Arc,
42};
43use tracing::{info, instrument, trace};
44
45mod provider;
46pub use provider::{
47    CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
48};
49
50use super::ProviderNodeTypes;
51use reth_trie::KeccakKeyHasher;
52
53mod builder;
54pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
55
56mod metrics;
57
58mod chain;
59pub use chain::*;
60
61/// A common provider that fetches data from a database or static file.
62///
63/// This provider implements most provider or provider factory traits.
64pub struct ProviderFactory<N: NodeTypesWithDB> {
65    /// Database instance
66    db: N::DB,
67    /// Chain spec
68    chain_spec: Arc<N::ChainSpec>,
69    /// Static File Provider
70    static_file_provider: StaticFileProvider<N::Primitives>,
71    /// Optional pruning configuration
72    prune_modes: PruneModes,
73    /// The node storage handler.
74    storage: Arc<N::Storage>,
75    /// Storage configuration settings for this node
76    storage_settings: Arc<RwLock<StorageSettings>>,
77    /// `RocksDB` provider
78    rocksdb_provider: RocksDBProvider,
79    /// Changeset cache for trie unwinding
80    changeset_cache: ChangesetCache,
81    /// Task runtime for spawning parallel I/O work.
82    runtime: reth_tasks::Runtime,
83    /// Minimum distance from tip required before pruning can occur.
84    minimum_pruning_distance: u64,
85}
86
87impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
88    /// Instantiates the builder for this type
89    pub fn builder() -> ProviderFactoryBuilder<N> {
90        ProviderFactoryBuilder::default()
91    }
92}
93
94impl<N: ProviderNodeTypes> ProviderFactory<N> {
95    /// Create new database provider factory.
96    ///
97    /// The storage backends used by the produced factory MAY be inconsistent.
98    /// It is recommended to call [`Self::check_consistency`] after
99    /// creation to ensure consistency between the database and static files.
100    /// If the function returns unwind targets, the caller MUST unwind the
101    /// inner database to the minimum of the two targets to ensure consistency.
102    pub fn new(
103        db: N::DB,
104        chain_spec: Arc<N::ChainSpec>,
105        static_file_provider: StaticFileProvider<N::Primitives>,
106        rocksdb_provider: RocksDBProvider,
107        runtime: reth_tasks::Runtime,
108    ) -> ProviderResult<Self> {
109        // Load storage settings from database at init time. Creates a temporary provider
110        // to read persisted settings, falling back to legacy defaults if none exist.
111        //
112        // Both factory and all providers it creates should share these cached settings.
113        let legacy_settings = StorageSettings::v1();
114        let storage_settings = DatabaseProvider::<_, N>::new(
115            db.tx()?,
116            chain_spec.clone(),
117            static_file_provider.clone(),
118            Default::default(),
119            Default::default(),
120            Arc::new(RwLock::new(legacy_settings)),
121            rocksdb_provider.clone(),
122            ChangesetCache::new(),
123            runtime.clone(),
124            db.path(),
125        )
126        .storage_settings()?
127        .unwrap_or(legacy_settings);
128
129        Ok(Self {
130            db,
131            chain_spec,
132            static_file_provider,
133            prune_modes: PruneModes::default(),
134            storage: Default::default(),
135            storage_settings: Arc::new(RwLock::new(storage_settings)),
136            rocksdb_provider,
137            changeset_cache: ChangesetCache::new(),
138            runtime,
139            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
140        })
141    }
142
143    /// Create new database provider factory and perform consistency checks.
144    ///
145    /// This will call [`Self::check_consistency`] internally and return
146    /// [`ProviderError::MustUnwind`] if inconsistencies are found. It may also
147    /// return any [`ProviderError`] that [`Self::new`] may return, or that are
148    /// encountered during consistency checks.
149    pub fn new_checked(
150        db: N::DB,
151        chain_spec: Arc<N::ChainSpec>,
152        static_file_provider: StaticFileProvider<N::Primitives>,
153        rocksdb_provider: RocksDBProvider,
154        runtime: reth_tasks::Runtime,
155    ) -> ProviderResult<Self> {
156        Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
157            .and_then(Self::assert_consistent)
158    }
159}
160
161impl<N: NodeTypesWithDB> ProviderFactory<N> {
162    /// Sets the pruning configuration for an existing [`ProviderFactory`].
163    pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
164        self.prune_modes = prune_modes;
165        self
166    }
167
168    /// Sets the changeset cache for an existing [`ProviderFactory`].
169    pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
170        self.changeset_cache = changeset_cache;
171        self
172    }
173
174    /// Sets the minimum pruning distance for an existing [`ProviderFactory`].
175    ///
176    /// This controls the minimum distance from tip required before pruning can occur.
177    /// The default is [`MINIMUM_UNWIND_SAFE_DISTANCE`].
178    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
179        self.minimum_pruning_distance = distance;
180        self
181    }
182
183    /// Returns reference to the underlying database.
184    pub const fn db_ref(&self) -> &N::DB {
185        &self.db
186    }
187
188    #[cfg(any(test, feature = "test-utils"))]
189    /// Consumes Self and returns DB
190    pub fn into_db(self) -> N::DB {
191        self.db
192    }
193}
194
195impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
196    fn cached_storage_settings(&self) -> StorageSettings {
197        *self.storage_settings.read()
198    }
199
200    fn set_storage_settings_cache(&self, settings: StorageSettings) {
201        *self.storage_settings.write() = settings;
202    }
203}
204
205impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
206    fn rocksdb_provider(&self) -> RocksDBProvider {
207        self.rocksdb_provider.clone()
208    }
209
210    fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
211        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
212    }
213
214    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
215        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
216    }
217}
218
219impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
220    /// Create new database provider by passing a path. [`ProviderFactory`] will own the database
221    /// instance.
222    pub fn new_with_database_path<P: AsRef<Path>>(
223        path: P,
224        chain_spec: Arc<N::ChainSpec>,
225        args: DatabaseArguments,
226        static_file_provider: StaticFileProvider<N::Primitives>,
227        rocksdb_provider: RocksDBProvider,
228        runtime: reth_tasks::Runtime,
229    ) -> RethResult<Self> {
230        Self::new(
231            init_db(path, args).map_err(RethError::msg)?,
232            chain_spec,
233            static_file_provider,
234            rocksdb_provider,
235            runtime,
236        )
237        .map_err(RethError::Provider)
238    }
239}
240
241impl<N: ProviderNodeTypes> ProviderFactory<N> {
242    /// Returns a provider with a created `DbTx` inside, which allows fetching data from the
243    /// database using different types of providers. Example: [`HeaderProvider`]
244    /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
245    ///
246    /// This sets the [`PruneModes`] to [`None`], because they should only be relevant for writing
247    /// data.
248    #[track_caller]
249    pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
250        Ok(DatabaseProvider::new(
251            self.db.tx()?,
252            self.chain_spec.clone(),
253            self.static_file_provider.clone(),
254            self.prune_modes.clone(),
255            self.storage.clone(),
256            self.storage_settings.clone(),
257            self.rocksdb_provider.clone(),
258            self.changeset_cache.clone(),
259            self.runtime.clone(),
260            self.db.path(),
261        )
262        .with_minimum_pruning_distance(self.minimum_pruning_distance))
263    }
264
265    /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
266    /// data from the database using different types of providers. Example: [`HeaderProvider`]
267    /// [`BlockHashReader`].  This may fail if the inner read/write database transaction fails to
268    /// open.
269    #[track_caller]
270    pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
271        Ok(DatabaseProviderRW(
272            DatabaseProvider::new_rw(
273                self.db.tx_mut()?,
274                self.chain_spec.clone(),
275                self.static_file_provider.clone(),
276                self.prune_modes.clone(),
277                self.storage.clone(),
278                self.storage_settings.clone(),
279                self.rocksdb_provider.clone(),
280                self.changeset_cache.clone(),
281                self.runtime.clone(),
282                self.db.path(),
283            )
284            .with_minimum_pruning_distance(self.minimum_pruning_distance),
285        ))
286    }
287
288    /// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
289    /// Uses unwind commit order (MDBX first, then `RocksDB`, then static files) to allow
290    /// recovery by truncating static files on restart if interrupted.
291    #[track_caller]
292    pub fn unwind_provider_rw(
293        &self,
294    ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
295        Ok(DatabaseProvider::new_unwind_rw(
296            self.db.tx_mut()?,
297            self.chain_spec.clone(),
298            self.static_file_provider.clone(),
299            self.prune_modes.clone(),
300            self.storage.clone(),
301            self.storage_settings.clone(),
302            self.rocksdb_provider.clone(),
303            self.changeset_cache.clone(),
304            self.runtime.clone(),
305            self.db.path(),
306        )
307        .with_minimum_pruning_distance(self.minimum_pruning_distance))
308    }
309
310    /// State provider for latest block
311    #[track_caller]
312    pub fn latest(&self) -> ProviderResult<StateProviderBox> {
313        trace!(target: "providers::db", "Returning latest state provider");
314        Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
315    }
316
317    /// Storage provider for state at that given block
318    pub fn history_by_block_number(
319        &self,
320        block_number: BlockNumber,
321    ) -> ProviderResult<StateProviderBox> {
322        let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
323        trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
324        Ok(state_provider)
325    }
326
327    /// Storage provider for state at that given block hash
328    pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
329        let provider = self.provider()?;
330
331        let block_number = provider
332            .block_number(block_hash)?
333            .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
334
335        let state_provider = provider.try_into_history_at_block(block_number)?;
336        trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
337        Ok(state_provider)
338    }
339
340    /// Asserts that the static files and database are consistent. If not,
341    /// returns [`ProviderError::MustUnwind`] with the appropriate unwind
342    /// target. May also return any [`ProviderError`] that
343    /// [`Self::check_consistency`] may return.
344    pub fn assert_consistent(self) -> ProviderResult<Self> {
345        let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
346
347        let source = match (rocksdb_unwind, static_file_unwind) {
348            (None, None) => return Ok(self),
349            (Some(_), Some(_)) => "RocksDB and Static Files",
350            (Some(_), None) => "RocksDB",
351            (None, Some(_)) => "Static Files",
352        };
353
354        Err(ProviderError::MustUnwind {
355            data_source: source,
356            unwind_to: rocksdb_unwind
357                .into_iter()
358                .chain(static_file_unwind)
359                .min()
360                .expect("at least one unwind target must be Some"),
361        })
362    }
363
364    /// Checks the consistency between the static files and the database. This
365    /// may result in static files being pruned or otherwise healed to ensure
366    /// consistency. I.e. this MAY result in writes to the static files.
367    #[instrument(err, skip(self))]
368    pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
369        let provider_ro = self
370            .database_provider_ro()?
371            // Healing can run long-lived read transactions (e.g., iterating changesets
372            // over millions of blocks). Disable the default timeout so MDBX doesn't
373            // kill the transaction mid-heal, which causes a crash loop on startup.
374            .disable_long_read_transaction_safety();
375
376        // Step 1: heal file-level inconsistencies (no pruning)
377        self.static_file_provider().check_file_consistency(&provider_ro)?;
378
379        // Step 2: RocksDB consistency check (needs static files tx data)
380        let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
381
382        // Step 3: Static file checkpoint consistency (may prune)
383        let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
384            |target| match target {
385                PipelineTarget::Unwind(block) => block,
386                PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
387            },
388        );
389
390        // Step 4: Heal finalized/safe block numbers that may be ahead of the
391        // highest header on nodes coming from <=1.10.2.
392        //
393        // Unwinds already set it to the target block.
394        if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
395            self.heal_chain_state_block_numbers(&provider_ro)?;
396        }
397
398        Ok((rocksdb_unwind, static_file_unwind))
399    }
400
401    /// If the stored finalized or safe block number is ahead of the highest
402    /// header, resets it to the highest header.
403    fn heal_chain_state_block_numbers(
404        &self,
405        provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
406    ) -> ProviderResult<()> {
407        let highest_header = self.last_block_number()?;
408
409        let finalized = provider_ro.last_finalized_block_number()?;
410        let safe = provider_ro.last_safe_block_number()?;
411
412        if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
413        {
414            return Ok(());
415        }
416
417        let provider_rw = self.provider_rw()?;
418
419        if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
420            info!(
421                target: "providers::db",
422                finalized,
423                highest_header,
424                "Healing finalized block number",
425            );
426            provider_rw.save_finalized_block_number(highest_header)?;
427        }
428
429        if let Some(safe) = safe.filter(|&s| s > highest_header) {
430            info!(
431                target: "providers::db",
432                safe,
433                highest_header,
434                "Healing safe block number",
435            );
436            provider_rw.save_safe_block_number(highest_header)?;
437        }
438
439        provider_rw.commit()?;
440
441        Ok(())
442    }
443}
444
445impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
446    type Primitives = N::Primitives;
447}
448
449impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
450    type DB = N::DB;
451    type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
452    type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
453
454    fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
455        self.provider()
456    }
457
458    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
459        self.provider_rw().map(|provider| provider.0)
460    }
461}
462
463impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
464    /// Returns static file provider
465    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
466        self.static_file_provider.clone()
467    }
468
469    fn get_static_file_writer(
470        &self,
471        block: BlockNumber,
472        segment: StaticFileSegment,
473    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
474        self.static_file_provider.get_writer(block, segment)
475    }
476}
477
478impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
479    type Header = HeaderTy<N>;
480    fn local_tip_header(
481        &self,
482        highest_uninterrupted_block: BlockNumber,
483    ) -> ProviderResult<SealedHeader<Self::Header>> {
484        self.provider()?.local_tip_header(highest_uninterrupted_block)
485    }
486}
487
488impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
489    type Header = HeaderTy<N>;
490
491    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
492        self.provider()?.header(block_hash)
493    }
494
495    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
496        self.static_file_provider.header_by_number(num)
497    }
498
499    fn headers_range(
500        &self,
501        range: impl RangeBounds<BlockNumber>,
502    ) -> ProviderResult<Vec<Self::Header>> {
503        self.static_file_provider.headers_range(range)
504    }
505
506    fn sealed_header(
507        &self,
508        number: BlockNumber,
509    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
510        self.static_file_provider.sealed_header(number)
511    }
512
513    fn sealed_headers_range(
514        &self,
515        range: impl RangeBounds<BlockNumber>,
516    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
517        self.static_file_provider.sealed_headers_range(range)
518    }
519
520    fn sealed_headers_while(
521        &self,
522        range: impl RangeBounds<BlockNumber>,
523        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
524    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
525        self.static_file_provider.sealed_headers_while(range, predicate)
526    }
527}
528
529impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
530    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
531        self.static_file_provider.block_hash(number)
532    }
533
534    fn canonical_hashes_range(
535        &self,
536        start: BlockNumber,
537        end: BlockNumber,
538    ) -> ProviderResult<Vec<B256>> {
539        self.static_file_provider.canonical_hashes_range(start, end)
540    }
541}
542
543impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
544    fn chain_info(&self) -> ProviderResult<ChainInfo> {
545        self.provider()?.chain_info()
546    }
547
548    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
549        self.provider()?.best_block_number()
550    }
551
552    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
553        self.static_file_provider.last_block_number()
554    }
555
556    fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
557        // earliest history height tracks the lowest block number that has __not__ been expired, in
558        // other words, the first/earliest available block.
559        Ok(self.static_file_provider.earliest_history_height())
560    }
561
562    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
563        self.provider()?.block_number(hash)
564    }
565}
566
567impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
568    type Block = BlockTy<N>;
569
570    fn find_block_by_hash(
571        &self,
572        hash: B256,
573        source: BlockSource,
574    ) -> ProviderResult<Option<Self::Block>> {
575        self.provider()?.find_block_by_hash(hash, source)
576    }
577
578    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
579        self.provider()?.block(id)
580    }
581
582    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
583        self.provider()?.pending_block()
584    }
585
586    fn pending_block_and_receipts(
587        &self,
588    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
589        self.provider()?.pending_block_and_receipts()
590    }
591
592    fn recovered_block(
593        &self,
594        id: BlockHashOrNumber,
595        transaction_kind: TransactionVariant,
596    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
597        self.provider()?.recovered_block(id, transaction_kind)
598    }
599
600    fn sealed_block_with_senders(
601        &self,
602        id: BlockHashOrNumber,
603        transaction_kind: TransactionVariant,
604    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
605        self.provider()?.sealed_block_with_senders(id, transaction_kind)
606    }
607
608    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
609        self.provider()?.block_range(range)
610    }
611
612    fn block_with_senders_range(
613        &self,
614        range: RangeInclusive<BlockNumber>,
615    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
616        self.provider()?.block_with_senders_range(range)
617    }
618
619    fn recovered_block_range(
620        &self,
621        range: RangeInclusive<BlockNumber>,
622    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
623        self.provider()?.recovered_block_range(range)
624    }
625
626    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
627        self.provider()?.block_by_transaction_id(id)
628    }
629}
630
631impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
632    type Transaction = TxTy<N>;
633
634    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
635        self.provider()?.transaction_id(tx_hash)
636    }
637
638    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
639        self.static_file_provider.transaction_by_id(id)
640    }
641
642    fn transaction_by_id_unhashed(
643        &self,
644        id: TxNumber,
645    ) -> ProviderResult<Option<Self::Transaction>> {
646        self.static_file_provider.transaction_by_id_unhashed(id)
647    }
648
649    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
650        self.provider()?.transaction_by_hash(hash)
651    }
652
653    fn transaction_by_hash_with_meta(
654        &self,
655        tx_hash: TxHash,
656    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
657        self.provider()?.transaction_by_hash_with_meta(tx_hash)
658    }
659
660    fn transactions_by_block(
661        &self,
662        id: BlockHashOrNumber,
663    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
664        self.provider()?.transactions_by_block(id)
665    }
666
667    fn transactions_by_block_range(
668        &self,
669        range: impl RangeBounds<BlockNumber>,
670    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
671        self.provider()?.transactions_by_block_range(range)
672    }
673
674    fn transactions_by_tx_range(
675        &self,
676        range: impl RangeBounds<TxNumber>,
677    ) -> ProviderResult<Vec<Self::Transaction>> {
678        self.static_file_provider.transactions_by_tx_range(range)
679    }
680
681    fn senders_by_tx_range(
682        &self,
683        range: impl RangeBounds<TxNumber>,
684    ) -> ProviderResult<Vec<Address>> {
685        if EitherWriterDestination::senders(self).is_static_file() {
686            self.static_file_provider.senders_by_tx_range(range)
687        } else {
688            self.provider()?.senders_by_tx_range(range)
689        }
690    }
691
692    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
693        if EitherWriterDestination::senders(self).is_static_file() {
694            self.static_file_provider.transaction_sender(id)
695        } else {
696            self.provider()?.transaction_sender(id)
697        }
698    }
699}
700
701impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
702    type Receipt = ReceiptTy<N>;
703
704    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
705        self.static_file_provider.get_with_static_file_or_database(
706            StaticFileSegment::Receipts,
707            id,
708            |static_file| static_file.receipt(id),
709            || self.provider()?.receipt(id),
710        )
711    }
712
713    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
714        self.provider()?.receipt_by_hash(hash)
715    }
716
717    fn receipts_by_block(
718        &self,
719        block: BlockHashOrNumber,
720    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
721        self.provider()?.receipts_by_block(block)
722    }
723
724    fn receipts_by_tx_range(
725        &self,
726        range: impl RangeBounds<TxNumber>,
727    ) -> ProviderResult<Vec<Self::Receipt>> {
728        self.static_file_provider.get_range_with_static_file_or_database(
729            StaticFileSegment::Receipts,
730            to_range(range),
731            |static_file, range, _| static_file.receipts_by_tx_range(range),
732            |range, _| self.provider()?.receipts_by_tx_range(range),
733            |_| true,
734        )
735    }
736
737    fn receipts_by_block_range(
738        &self,
739        block_range: RangeInclusive<BlockNumber>,
740    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
741        self.provider()?.receipts_by_block_range(block_range)
742    }
743}
744
745impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
746    fn block_body_indices(
747        &self,
748        number: BlockNumber,
749    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
750        self.provider()?.block_body_indices(number)
751    }
752
753    fn block_body_indices_range(
754        &self,
755        range: RangeInclusive<BlockNumber>,
756    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
757        self.provider()?.block_body_indices_range(range)
758    }
759}
760
761impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
762    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
763        self.provider()?.get_stage_checkpoint(id)
764    }
765
766    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
767        self.provider()?.get_stage_checkpoint_progress(id)
768    }
769    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
770        self.provider()?.get_all_checkpoints()
771    }
772}
773
774impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
775    type ChainSpec = N::ChainSpec;
776
777    fn chain_spec(&self) -> Arc<N::ChainSpec> {
778        self.chain_spec.clone()
779    }
780}
781
782impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
783    fn get_prune_checkpoint(
784        &self,
785        segment: PruneSegment,
786    ) -> ProviderResult<Option<PruneCheckpoint>> {
787        self.provider()?.get_prune_checkpoint(segment)
788    }
789
790    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
791        self.provider()?.get_prune_checkpoints()
792    }
793}
794
795impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
796    fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
797        HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
798    }
799}
800
801impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
802    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
803        self.provider()?.get_metadata(key)
804    }
805}
806
807impl<N> fmt::Debug for ProviderFactory<N>
808where
809    N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
810{
811    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812        let Self {
813            db,
814            chain_spec,
815            static_file_provider,
816            prune_modes,
817            storage,
818            storage_settings,
819            rocksdb_provider,
820            changeset_cache,
821            runtime,
822            minimum_pruning_distance,
823        } = self;
824        f.debug_struct("ProviderFactory")
825            .field("db", &db)
826            .field("chain_spec", &chain_spec)
827            .field("static_file_provider", &static_file_provider)
828            .field("prune_modes", &prune_modes)
829            .field("storage", &storage)
830            .field("storage_settings", &*storage_settings.read())
831            .field("rocksdb_provider", &rocksdb_provider)
832            .field("changeset_cache", &changeset_cache)
833            .field("runtime", &runtime)
834            .field("minimum_pruning_distance", &minimum_pruning_distance)
835            .finish()
836    }
837}
838
839impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
840    fn clone(&self) -> Self {
841        Self {
842            db: self.db.clone(),
843            chain_spec: self.chain_spec.clone(),
844            static_file_provider: self.static_file_provider.clone(),
845            prune_modes: self.prune_modes.clone(),
846            storage: self.storage.clone(),
847            storage_settings: self.storage_settings.clone(),
848            rocksdb_provider: self.rocksdb_provider.clone(),
849            changeset_cache: self.changeset_cache.clone(),
850            runtime: self.runtime.clone(),
851            minimum_pruning_distance: self.minimum_pruning_distance,
852        }
853    }
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859    use crate::{
860        providers::{StaticFileProvider, StaticFileWriter},
861        test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
862        BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
863        TransactionsProvider,
864    };
865    use alloy_primitives::{TxNumber, B256};
866    use assert_matches::assert_matches;
867    use reth_chainspec::ChainSpecBuilder;
868    use reth_db::{
869        mdbx::DatabaseArguments,
870        test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
871    };
872    use reth_db_api::tables;
873    use reth_primitives_traits::SignerRecoverable;
874    use reth_prune_types::{PruneMode, PruneModes};
875    use reth_storage_errors::provider::ProviderError;
876    use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
877    use std::{ops::RangeInclusive, sync::Arc};
878
879    #[test]
880    fn common_history_provider() {
881        let factory = create_test_provider_factory();
882        let _ = factory.latest();
883    }
884
885    #[test]
886    fn default_chain_info() {
887        let factory = create_test_provider_factory();
888        let provider = factory.provider().unwrap();
889
890        let chain_info = provider.chain_info().expect("should be ok");
891        assert_eq!(chain_info.best_number, 0);
892        assert_eq!(chain_info.best_hash, B256::ZERO);
893    }
894
895    #[test]
896    fn provider_flow() {
897        let factory = create_test_provider_factory();
898        let provider = factory.provider().unwrap();
899        provider.block_hash(0).unwrap();
900        let provider_rw = factory.provider_rw().unwrap();
901        provider_rw.block_hash(0).unwrap();
902        provider.block_hash(0).unwrap();
903    }
904
905    #[test]
906    fn provider_factory_with_database_path() {
907        let chain_spec = ChainSpecBuilder::mainnet().build();
908        let (_static_dir, static_dir_path) = create_test_static_files_dir();
909        let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
910        let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
911        let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
912            _db_tempdir.path(),
913            Arc::new(chain_spec),
914            DatabaseArguments::new(Default::default()),
915            StaticFileProvider::read_write(static_dir_path).unwrap(),
916            RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
917            reth_tasks::Runtime::test(),
918        )
919        .unwrap();
920        let provider = factory.provider().unwrap();
921        provider.block_hash(0).unwrap();
922        let provider_rw = factory.provider_rw().unwrap();
923        provider_rw.block_hash(0).unwrap();
924        provider.block_hash(0).unwrap();
925    }
926
927    #[test]
928    fn insert_block_with_prune_modes() {
929        let block = TEST_BLOCK.clone();
930
931        {
932            let factory = create_test_provider_factory();
933            let provider = factory.provider_rw().unwrap();
934            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
935            assert_matches!(
936                provider.transaction_sender(0), Ok(Some(sender))
937                if sender == block.body().transactions[0].recover_signer().unwrap()
938            );
939            assert_matches!(
940                provider.transaction_id(*block.body().transactions[0].tx_hash()),
941                Ok(Some(0))
942            );
943        }
944
945        {
946            let prune_modes = PruneModes {
947                sender_recovery: Some(PruneMode::Full),
948                transaction_lookup: Some(PruneMode::Full),
949                ..PruneModes::default()
950            };
951            // Keep factory alive until provider is dropped to prevent TempDatabase cleanup
952            let factory = create_test_provider_factory().with_prune_modes(prune_modes);
953            let provider = factory.provider_rw().unwrap();
954            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
955            assert_matches!(provider.transaction_sender(0), Ok(None));
956            assert_matches!(
957                provider.transaction_id(*block.body().transactions[0].tx_hash()),
958                Ok(None)
959            );
960        }
961    }
962
963    #[test]
964    fn take_block_transaction_range_recover_senders() {
965        let mut rng = generators::rng();
966        let block =
967            random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
968
969        let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
970        for range in tx_ranges {
971            let factory = create_test_provider_factory();
972            let provider = factory.provider_rw().unwrap();
973
974            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
975
976            let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
977            assert_eq!(
978                senders,
979                range
980                    .clone()
981                    .map(|tx_number| (
982                        tx_number,
983                        block.body().transactions[tx_number as usize].recover_signer().unwrap()
984                    ))
985                    .collect::<Vec<_>>()
986            );
987
988            let db_senders = provider.senders_by_tx_range(range);
989            assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
990        }
991    }
992
993    #[test]
994    fn header_sync_gap_lookup() {
995        let factory = create_test_provider_factory();
996        let provider = factory.provider_rw().unwrap();
997
998        let mut rng = generators::rng();
999
1000        // Genesis
1001        let checkpoint = 0;
1002        let head = random_header(&mut rng, 0, None);
1003
1004        // Empty database
1005        assert_matches!(
1006            provider.local_tip_header(checkpoint),
1007            Err(ProviderError::HeaderNotFound(block_number))
1008                if block_number.as_number().unwrap() == checkpoint
1009        );
1010
1011        // Checkpoint and no gap
1012        let static_file_provider = provider.static_file_provider();
1013        let mut static_file_writer =
1014            static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
1015        static_file_writer.append_header(head.header(), &head.hash()).unwrap();
1016        static_file_writer.commit().unwrap();
1017        drop(static_file_writer);
1018
1019        let local_head = provider.local_tip_header(checkpoint).unwrap();
1020
1021        assert_eq!(local_head, head);
1022    }
1023}