Skip to main content

reth_provider/providers/database/
mod.rs

1use crate::{
2    providers::{
3        state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4        StaticFileProvider, StaticFileProviderRWRefMut,
5    },
6    to_range,
7    traits::{BlockSource, ReceiptProvider},
8    BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
9    EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
10    MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
11    StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
12    TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use notify::{RecommendedWatcher, RecursiveMode, Watcher};
19use parking_lot::RwLock;
20use reth_chainspec::ChainInfo;
21use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
22use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
23use reth_errors::{RethError, RethResult};
24use reth_node_types::{
25    BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
26};
27use reth_primitives_traits::{RecoveredBlock, SealedHeader};
28use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
29use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
30use reth_static_file_types::StaticFileSegment;
31use reth_storage_api::{
32    BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
33    NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
34};
35use reth_storage_errors::provider::ProviderResult;
36use reth_trie::HashedPostState;
37use reth_trie_db::ChangesetCache;
38use revm_database::BundleState;
39use std::{
40    ops::{RangeBounds, RangeInclusive},
41    path::Path,
42    sync::{
43        atomic::{AtomicU64, Ordering},
44        Arc, Mutex,
45    },
46};
47use tracing::{info, instrument, trace, warn};
48
49mod provider;
50pub use provider::{
51    CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
52};
53
54use super::ProviderNodeTypes;
55use reth_trie::KeccakKeyHasher;
56
57mod builder;
58pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
59
60mod metrics;
61
62mod chain;
63pub use chain::*;
64
65/// Sync state for read-only [`ProviderFactory`] instances.
66struct ReadOnlySyncState {
67    /// Last MDBX txn ID we synced `RocksDB` secondary / static file indexes to.
68    last_synced_txnid: AtomicU64,
69    /// Serializes the slow-path catch-up (`RocksDB` + static file re-init).
70    sync_lock: Mutex<()>,
71}
72
73/// A common provider that fetches data from a database or static file.
74///
75/// This provider implements most provider or provider factory traits.
76pub struct ProviderFactory<N: NodeTypesWithDB> {
77    /// Database instance
78    db: N::DB,
79    /// Chain spec
80    chain_spec: Arc<N::ChainSpec>,
81    /// Static File Provider
82    static_file_provider: StaticFileProvider<N::Primitives>,
83    /// Optional pruning configuration
84    prune_modes: PruneModes,
85    /// The node storage handler.
86    storage: Arc<N::Storage>,
87    /// Storage configuration settings for this node
88    storage_settings: Arc<RwLock<StorageSettings>>,
89    /// `RocksDB` provider
90    rocksdb_provider: RocksDBProvider,
91    /// Changeset cache for trie unwinding
92    changeset_cache: ChangesetCache,
93    /// Task runtime for spawning parallel I/O work.
94    runtime: reth_tasks::Runtime,
95    /// Minimum distance from tip required before pruning can occur.
96    minimum_pruning_distance: u64,
97    /// State for on-demand syncing of `RocksDB` secondary and static file indexes.
98    ///
99    /// Only set for read-only factories. Can be disabled if there is no concurrent read-write
100    /// factory writing to the database (e.g as part of a running reth node).
101    read_only_sync: Option<Arc<ReadOnlySyncState>>,
102}
103
104impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
105    /// Instantiates the builder for this type
106    pub fn builder() -> ProviderFactoryBuilder<N> {
107        ProviderFactoryBuilder::default()
108    }
109}
110
111impl<N: ProviderNodeTypes> ProviderFactory<N> {
112    /// Create new database provider factory.
113    ///
114    /// The storage backends used by the produced factory MAY be inconsistent.
115    /// It is recommended to call [`Self::check_consistency`] after
116    /// creation to ensure consistency between the database and static files.
117    /// If the function returns unwind targets, the caller MUST unwind the
118    /// inner database to the minimum of the two targets to ensure consistency.
119    pub fn new(
120        db: N::DB,
121        chain_spec: Arc<N::ChainSpec>,
122        static_file_provider: StaticFileProvider<N::Primitives>,
123        rocksdb_provider: RocksDBProvider,
124        runtime: reth_tasks::Runtime,
125    ) -> ProviderResult<Self> {
126        // Load storage settings from database at init time. Creates a temporary provider
127        // to read persisted settings, falling back to legacy defaults if none exist.
128        //
129        // Both factory and all providers it creates should share these cached settings.
130        let legacy_settings = StorageSettings::v1();
131        let storage_settings = DatabaseProvider::<_, N>::new(
132            db.tx()?,
133            chain_spec.clone(),
134            static_file_provider.clone(),
135            Default::default(),
136            Default::default(),
137            Arc::new(RwLock::new(legacy_settings)),
138            rocksdb_provider.clone(),
139            ChangesetCache::new(),
140            runtime.clone(),
141            db.path(),
142        )
143        .storage_settings()?
144        .unwrap_or(legacy_settings);
145
146        Ok(Self {
147            db,
148            chain_spec,
149            static_file_provider,
150            prune_modes: PruneModes::default(),
151            storage: Default::default(),
152            storage_settings: Arc::new(RwLock::new(storage_settings)),
153            rocksdb_provider,
154            changeset_cache: ChangesetCache::new(),
155            runtime,
156            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
157            read_only_sync: None,
158        })
159    }
160
161    /// Create new database provider factory and perform consistency checks.
162    ///
163    /// This will call [`Self::check_consistency`] internally and return
164    /// [`ProviderError::MustUnwind`] if inconsistencies are found. It may also
165    /// return any [`ProviderError`] that [`Self::new`] may return, or that are
166    /// encountered during consistency checks.
167    pub fn new_checked(
168        db: N::DB,
169        chain_spec: Arc<N::ChainSpec>,
170        static_file_provider: StaticFileProvider<N::Primitives>,
171        rocksdb_provider: RocksDBProvider,
172        runtime: reth_tasks::Runtime,
173    ) -> ProviderResult<Self> {
174        Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
175            .and_then(Self::assert_consistent)
176    }
177}
178
179impl<N: NodeTypesWithDB> ProviderFactory<N> {
180    /// Sets the pruning configuration for an existing [`ProviderFactory`].
181    pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
182        self.prune_modes = prune_modes;
183        self
184    }
185
186    /// Sets the changeset cache for an existing [`ProviderFactory`].
187    pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
188        self.changeset_cache = changeset_cache;
189        self
190    }
191
192    /// Sets the minimum pruning distance for an existing [`ProviderFactory`].
193    ///
194    /// This controls the minimum distance from tip required before pruning can occur.
195    /// The default is [`MINIMUM_UNWIND_SAFE_DISTANCE`].
196    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
197        self.minimum_pruning_distance = distance;
198        self
199    }
200
201    /// Enables on-demand syncing of `RocksDB` secondary and static file indexes for read-only
202    /// factories. Initializes the tracker to the current MDBX txn ID.
203    ///
204    /// Should be used for read-only factories that are running concurrently to a reth node writing
205    /// new data to the database. Would effectively be a no-op if database directory is unchanged.
206    pub fn with_read_only_sync(mut self, watch: bool) -> Self
207    where
208        N::DB: Database,
209    {
210        // Initialize to 0 so the first `sync_providers_if_needed` call always
211        // triggers a RocksDB/static-file catch-up, regardless of what MDBX txnid
212        // the database was at when we opened it.
213        let state = Arc::new(ReadOnlySyncState {
214            last_synced_txnid: AtomicU64::new(0),
215            sync_lock: Mutex::new(()),
216        });
217        self.read_only_sync = Some(state);
218
219        if watch {
220            self.watch_db_directory();
221        }
222        self
223    }
224
225    /// Watches the MDBX data directory for changes and eagerly syncs `RocksDB` secondary and
226    /// static file indexes when modifications are detected.
227    fn watch_db_directory(&self)
228    where
229        N::DB: Database,
230    {
231        let factory = self.clone();
232        let db_path = self.db.path();
233        reth_tasks::spawn_os_thread("ro-sync", move || {
234            let (tx, rx) = std::sync::mpsc::channel();
235            let mut watcher = RecommendedWatcher::new(
236                move |res| {
237                    let _ = tx.send(res);
238                },
239                notify::Config::default(),
240            )
241            .expect("failed to create watcher");
242
243            watcher
244                .watch(&db_path, RecursiveMode::NonRecursive)
245                .expect("failed to watch MDBX path");
246
247            while let Ok(res) = rx.recv() {
248                match res {
249                    Ok(event) => {
250                        if !matches!(
251                            event.kind,
252                            notify::EventKind::Modify(_) | notify::EventKind::Create(_)
253                        ) {
254                            continue;
255                        }
256
257                        if let Err(err) = factory.sync_providers_if_needed() {
258                            warn!(target: "reth::provider", %err, "background ro-sync failed");
259                        }
260                    }
261                    Err(err) => {
262                        warn!(target: "reth::provider", ?err, "MDBX directory watcher error");
263                    }
264                }
265            }
266        });
267    }
268
269    /// For read-only factories, checks whether the MDBX committed txn ID has advanced since the
270    /// last sync and, if so, catches up the `RocksDB` secondary instance and re-initializes the
271    /// static file index.
272    ///
273    /// No-op for read-write factories.
274    pub fn sync_providers_if_needed(&self) -> ProviderResult<()> {
275        let Some(sync_state) = &self.read_only_sync else { return Ok(()) };
276        let current_txnid = self.db.last_txnid().unwrap_or(0);
277
278        // Fast path: no contention when nothing changed.
279        if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
280            return Ok(());
281        }
282
283        // Slow path: serialize the actual catch-up I/O.
284        let _guard = sync_state.sync_lock.lock().unwrap_or_else(|e| e.into_inner());
285
286        // Double-check after acquiring the lock — another thread may have already synced.
287        if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
288            return Ok(());
289        }
290
291        self.rocksdb_provider.try_catch_up_with_primary()?;
292        self.static_file_provider.initialize_index()?;
293        sync_state.last_synced_txnid.store(current_txnid, Ordering::Relaxed);
294        Ok(())
295    }
296
297    /// Returns reference to the underlying database.
298    pub const fn db_ref(&self) -> &N::DB {
299        &self.db
300    }
301
302    #[cfg(any(test, feature = "test-utils"))]
303    /// Consumes Self and returns DB
304    pub fn into_db(self) -> N::DB {
305        self.db
306    }
307}
308
309impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
310    fn cached_storage_settings(&self) -> StorageSettings {
311        *self.storage_settings.read()
312    }
313
314    fn set_storage_settings_cache(&self, settings: StorageSettings) {
315        *self.storage_settings.write() = settings;
316    }
317}
318
319impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
320    fn rocksdb_provider(&self) -> RocksDBProvider {
321        self.rocksdb_provider.clone()
322    }
323
324    fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
325        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
326    }
327
328    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
329        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
330    }
331}
332
333impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
334    /// Create new database provider by passing a path. [`ProviderFactory`] will own the database
335    /// instance.
336    pub fn new_with_database_path<P: AsRef<Path>>(
337        path: P,
338        chain_spec: Arc<N::ChainSpec>,
339        args: DatabaseArguments,
340        static_file_provider: StaticFileProvider<N::Primitives>,
341        rocksdb_provider: RocksDBProvider,
342        runtime: reth_tasks::Runtime,
343    ) -> RethResult<Self> {
344        Self::new(
345            init_db(path, args).map_err(RethError::msg)?,
346            chain_spec,
347            static_file_provider,
348            rocksdb_provider,
349            runtime,
350        )
351        .map_err(RethError::Provider)
352    }
353}
354
355impl<N: ProviderNodeTypes> ProviderFactory<N> {
356    /// Returns a provider with a created `DbTx` inside, which allows fetching data from the
357    /// database using different types of providers. Example: [`HeaderProvider`]
358    /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
359    ///
360    /// This sets the [`PruneModes`] to [`None`], because they should only be relevant for writing
361    /// data.
362    #[track_caller]
363    pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
364        let db_tx = self.db.tx()?;
365
366        // Sync providers after opening the database transaction to make
367        // sure that no data is pruned from rocksdb or static files.
368        //
369        // Reorg logic ensures that no data is pruned from rocksdb or static files while there is an
370        // mdbx transaction open that might rely on this data.
371        self.sync_providers_if_needed()?;
372
373        Ok(DatabaseProvider::new(
374            db_tx,
375            self.chain_spec.clone(),
376            self.static_file_provider.clone(),
377            self.prune_modes.clone(),
378            self.storage.clone(),
379            self.storage_settings.clone(),
380            self.rocksdb_provider.clone(),
381            self.changeset_cache.clone(),
382            self.runtime.clone(),
383            self.db.path(),
384        )
385        .with_minimum_pruning_distance(self.minimum_pruning_distance))
386    }
387
388    /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
389    /// data from the database using different types of providers. Example: [`HeaderProvider`]
390    /// [`BlockHashReader`].  This may fail if the inner read/write database transaction fails to
391    /// open.
392    #[track_caller]
393    pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
394        Ok(DatabaseProviderRW(
395            DatabaseProvider::new_rw(
396                self.db.tx_mut()?,
397                self.chain_spec.clone(),
398                self.static_file_provider.clone(),
399                self.prune_modes.clone(),
400                self.storage.clone(),
401                self.storage_settings.clone(),
402                self.rocksdb_provider.clone(),
403                self.changeset_cache.clone(),
404                self.runtime.clone(),
405                self.db.path(),
406            )
407            .with_reader_txn_tracker(self.db.clone())
408            .with_minimum_pruning_distance(self.minimum_pruning_distance),
409        ))
410    }
411
412    /// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
413    /// Uses unwind commit order (MDBX first, then `RocksDB`, then static files) to allow
414    /// recovery by truncating static files on restart if interrupted.
415    ///
416    /// Unwind commits may wait for pre-existing readers to drain before finishing later
417    /// cross-store steps. Drop any long-lived read providers before committing this provider.
418    #[track_caller]
419    pub fn unwind_provider_rw(
420        &self,
421    ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
422        Ok(DatabaseProvider::new_unwind_rw(
423            self.db.tx_mut()?,
424            self.chain_spec.clone(),
425            self.static_file_provider.clone(),
426            self.prune_modes.clone(),
427            self.storage.clone(),
428            self.storage_settings.clone(),
429            self.rocksdb_provider.clone(),
430            self.changeset_cache.clone(),
431            self.runtime.clone(),
432            self.db.path(),
433        )
434        .with_reader_txn_tracker(self.db.clone())
435        .with_minimum_pruning_distance(self.minimum_pruning_distance))
436    }
437
438    /// State provider for latest block
439    #[track_caller]
440    pub fn latest(&self) -> ProviderResult<StateProviderBox> {
441        trace!(target: "providers::db", "Returning latest state provider");
442        Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
443    }
444
445    /// Storage provider for state at that given block
446    pub fn history_by_block_number(
447        &self,
448        block_number: BlockNumber,
449    ) -> ProviderResult<StateProviderBox> {
450        let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
451        trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
452        Ok(state_provider)
453    }
454
455    /// Storage provider for state at that given block hash
456    pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
457        let provider = self.provider()?;
458
459        let block_number = provider
460            .block_number(block_hash)?
461            .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
462
463        let state_provider = provider.try_into_history_at_block(block_number)?;
464        trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
465        Ok(state_provider)
466    }
467
468    /// Asserts that the static files and database are consistent. If not,
469    /// returns [`ProviderError::MustUnwind`] with the appropriate unwind
470    /// target. May also return any [`ProviderError`] that
471    /// [`Self::check_consistency`] may return.
472    pub fn assert_consistent(self) -> ProviderResult<Self> {
473        let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
474
475        let source = match (rocksdb_unwind, static_file_unwind) {
476            (None, None) => return Ok(self),
477            (Some(_), Some(_)) => "RocksDB and Static Files",
478            (Some(_), None) => "RocksDB",
479            (None, Some(_)) => "Static Files",
480        };
481
482        Err(ProviderError::MustUnwind {
483            data_source: source,
484            unwind_to: rocksdb_unwind
485                .into_iter()
486                .chain(static_file_unwind)
487                .min()
488                .expect("at least one unwind target must be Some"),
489        })
490    }
491
492    /// Checks the consistency between the static files and the database. This
493    /// may result in static files being pruned or otherwise healed to ensure
494    /// consistency. I.e. this MAY result in writes to the static files.
495    #[instrument(err, skip(self))]
496    pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
497        let provider_ro = self
498            .database_provider_ro()?
499            // Healing can run long-lived read transactions (e.g., iterating changesets
500            // over millions of blocks). Disable the default timeout so MDBX doesn't
501            // kill the transaction mid-heal, which causes a crash loop on startup.
502            .disable_long_read_transaction_safety();
503
504        // Step 1: heal file-level inconsistencies (no pruning)
505        self.static_file_provider().check_file_consistency(&provider_ro)?;
506
507        // Step 2: RocksDB consistency check (needs static files tx data)
508        let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
509
510        // Step 3: Static file checkpoint consistency (may prune)
511        let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
512            |target| match target {
513                PipelineTarget::Unwind(block) => block,
514                PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
515            },
516        );
517
518        // Step 4: Heal finalized/safe block numbers that may be ahead of the
519        // highest header on nodes coming from <=1.10.2.
520        //
521        // Unwinds already set it to the target block.
522        if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
523            self.heal_chain_state_block_numbers(&provider_ro)?;
524        }
525
526        Ok((rocksdb_unwind, static_file_unwind))
527    }
528
529    /// If the stored finalized or safe block number is ahead of the highest
530    /// header, resets it to the highest header.
531    fn heal_chain_state_block_numbers(
532        &self,
533        provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
534    ) -> ProviderResult<()> {
535        let highest_header = self.last_block_number()?;
536
537        let finalized = provider_ro.last_finalized_block_number()?;
538        let safe = provider_ro.last_safe_block_number()?;
539
540        if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
541        {
542            return Ok(());
543        }
544
545        let provider_rw = self.provider_rw()?;
546
547        if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
548            info!(
549                target: "providers::db",
550                finalized,
551                highest_header,
552                "Healing finalized block number",
553            );
554            provider_rw.save_finalized_block_number(highest_header)?;
555        }
556
557        if let Some(safe) = safe.filter(|&s| s > highest_header) {
558            info!(
559                target: "providers::db",
560                safe,
561                highest_header,
562                "Healing safe block number",
563            );
564            provider_rw.save_safe_block_number(highest_header)?;
565        }
566
567        provider_rw.commit()?;
568
569        Ok(())
570    }
571
572    /// Returns a static file provider. For read-only instances, this will also invoke
573    /// [`Self::sync_providers_if_needed`] to make sure that the static file provider is up to date.
574    pub fn caught_up_static_file_provider(
575        &self,
576    ) -> ProviderResult<StaticFileProvider<N::Primitives>> {
577        self.sync_providers_if_needed()?;
578        Ok(self.static_file_provider.clone())
579    }
580}
581
582impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
583    type Primitives = N::Primitives;
584}
585
586impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
587    type DB = N::DB;
588    type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
589    type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
590
591    fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
592        self.provider()
593    }
594
595    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
596        self.provider_rw().map(|provider| provider.0)
597    }
598}
599
600impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
601    /// Returns static file provider
602    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
603        self.static_file_provider.clone()
604    }
605
606    fn get_static_file_writer(
607        &self,
608        block: BlockNumber,
609        segment: StaticFileSegment,
610    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
611        self.static_file_provider.get_writer(block, segment)
612    }
613}
614
615impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
616    type Header = HeaderTy<N>;
617    fn local_tip_header(
618        &self,
619        highest_uninterrupted_block: BlockNumber,
620    ) -> ProviderResult<SealedHeader<Self::Header>> {
621        self.provider()?.local_tip_header(highest_uninterrupted_block)
622    }
623}
624
625impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
626    type Header = HeaderTy<N>;
627
628    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
629        self.provider()?.header(block_hash)
630    }
631
632    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
633        self.caught_up_static_file_provider()?.header_by_number(num)
634    }
635
636    fn headers_range(
637        &self,
638        range: impl RangeBounds<BlockNumber>,
639    ) -> ProviderResult<Vec<Self::Header>> {
640        self.caught_up_static_file_provider()?.headers_range(range)
641    }
642
643    fn sealed_header(
644        &self,
645        number: BlockNumber,
646    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
647        self.caught_up_static_file_provider()?.sealed_header(number)
648    }
649
650    fn sealed_headers_range(
651        &self,
652        range: impl RangeBounds<BlockNumber>,
653    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
654        self.caught_up_static_file_provider()?.sealed_headers_range(range)
655    }
656
657    fn sealed_headers_while(
658        &self,
659        range: impl RangeBounds<BlockNumber>,
660        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
661    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
662        self.caught_up_static_file_provider()?.sealed_headers_while(range, predicate)
663    }
664}
665
666impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
667    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
668        self.caught_up_static_file_provider()?.block_hash(number)
669    }
670
671    fn canonical_hashes_range(
672        &self,
673        start: BlockNumber,
674        end: BlockNumber,
675    ) -> ProviderResult<Vec<B256>> {
676        self.caught_up_static_file_provider()?.canonical_hashes_range(start, end)
677    }
678}
679
680impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
681    fn chain_info(&self) -> ProviderResult<ChainInfo> {
682        self.provider()?.chain_info()
683    }
684
685    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
686        self.provider()?.best_block_number()
687    }
688
689    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
690        self.caught_up_static_file_provider()?.last_block_number()
691    }
692
693    fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
694        // earliest history height tracks the lowest block number that has __not__ been expired, in
695        // other words, the first/earliest available block.
696        Ok(self.caught_up_static_file_provider()?.earliest_history_height())
697    }
698
699    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
700        self.provider()?.block_number(hash)
701    }
702}
703
704impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
705    type Block = BlockTy<N>;
706
707    fn find_block_by_hash(
708        &self,
709        hash: B256,
710        source: BlockSource,
711    ) -> ProviderResult<Option<Self::Block>> {
712        self.provider()?.find_block_by_hash(hash, source)
713    }
714
715    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
716        self.provider()?.block(id)
717    }
718
719    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
720        self.provider()?.pending_block()
721    }
722
723    fn pending_block_and_receipts(
724        &self,
725    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
726        self.provider()?.pending_block_and_receipts()
727    }
728
729    fn recovered_block(
730        &self,
731        id: BlockHashOrNumber,
732        transaction_kind: TransactionVariant,
733    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
734        self.provider()?.recovered_block(id, transaction_kind)
735    }
736
737    fn sealed_block_with_senders(
738        &self,
739        id: BlockHashOrNumber,
740        transaction_kind: TransactionVariant,
741    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
742        self.provider()?.sealed_block_with_senders(id, transaction_kind)
743    }
744
745    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
746        self.provider()?.block_range(range)
747    }
748
749    fn block_with_senders_range(
750        &self,
751        range: RangeInclusive<BlockNumber>,
752    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
753        self.provider()?.block_with_senders_range(range)
754    }
755
756    fn recovered_block_range(
757        &self,
758        range: RangeInclusive<BlockNumber>,
759    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
760        self.provider()?.recovered_block_range(range)
761    }
762
763    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
764        self.provider()?.block_by_transaction_id(id)
765    }
766}
767
768impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
769    type Transaction = TxTy<N>;
770
771    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
772        self.provider()?.transaction_id(tx_hash)
773    }
774
775    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
776        self.caught_up_static_file_provider()?.transaction_by_id(id)
777    }
778
779    fn transaction_by_id_unhashed(
780        &self,
781        id: TxNumber,
782    ) -> ProviderResult<Option<Self::Transaction>> {
783        self.caught_up_static_file_provider()?.transaction_by_id_unhashed(id)
784    }
785
786    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
787        self.provider()?.transaction_by_hash(hash)
788    }
789
790    fn transaction_by_hash_with_meta(
791        &self,
792        tx_hash: TxHash,
793    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
794        self.provider()?.transaction_by_hash_with_meta(tx_hash)
795    }
796
797    fn transactions_by_block(
798        &self,
799        id: BlockHashOrNumber,
800    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
801        self.provider()?.transactions_by_block(id)
802    }
803
804    fn transactions_by_block_range(
805        &self,
806        range: impl RangeBounds<BlockNumber>,
807    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
808        self.provider()?.transactions_by_block_range(range)
809    }
810
811    fn transactions_by_tx_range(
812        &self,
813        range: impl RangeBounds<TxNumber>,
814    ) -> ProviderResult<Vec<Self::Transaction>> {
815        self.caught_up_static_file_provider()?.transactions_by_tx_range(range)
816    }
817
818    fn senders_by_tx_range(
819        &self,
820        range: impl RangeBounds<TxNumber>,
821    ) -> ProviderResult<Vec<Address>> {
822        if EitherWriterDestination::senders(self).is_static_file() {
823            self.caught_up_static_file_provider()?.senders_by_tx_range(range)
824        } else {
825            self.provider()?.senders_by_tx_range(range)
826        }
827    }
828
829    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
830        if EitherWriterDestination::senders(self).is_static_file() {
831            self.caught_up_static_file_provider()?.transaction_sender(id)
832        } else {
833            self.provider()?.transaction_sender(id)
834        }
835    }
836}
837
838impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
839    type Receipt = ReceiptTy<N>;
840
841    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
842        self.caught_up_static_file_provider()?.get_with_static_file_or_database(
843            StaticFileSegment::Receipts,
844            id,
845            |static_file| static_file.receipt(id),
846            || self.provider()?.receipt(id),
847        )
848    }
849
850    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
851        self.provider()?.receipt_by_hash(hash)
852    }
853
854    fn receipts_by_block(
855        &self,
856        block: BlockHashOrNumber,
857    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
858        self.provider()?.receipts_by_block(block)
859    }
860
861    fn receipts_by_tx_range(
862        &self,
863        range: impl RangeBounds<TxNumber>,
864    ) -> ProviderResult<Vec<Self::Receipt>> {
865        self.caught_up_static_file_provider()?.get_range_with_static_file_or_database(
866            StaticFileSegment::Receipts,
867            to_range(range),
868            |static_file, range, _| static_file.receipts_by_tx_range(range),
869            |range, _| self.provider()?.receipts_by_tx_range(range),
870            |_| true,
871        )
872    }
873
874    fn receipts_by_block_range(
875        &self,
876        block_range: RangeInclusive<BlockNumber>,
877    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
878        self.provider()?.receipts_by_block_range(block_range)
879    }
880}
881
882impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
883    fn block_body_indices(
884        &self,
885        number: BlockNumber,
886    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
887        self.provider()?.block_body_indices(number)
888    }
889
890    fn block_body_indices_range(
891        &self,
892        range: RangeInclusive<BlockNumber>,
893    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
894        self.provider()?.block_body_indices_range(range)
895    }
896}
897
898impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
899    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
900        self.provider()?.get_stage_checkpoint(id)
901    }
902
903    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
904        self.provider()?.get_stage_checkpoint_progress(id)
905    }
906    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
907        self.provider()?.get_all_checkpoints()
908    }
909}
910
911impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
912    type ChainSpec = N::ChainSpec;
913
914    fn chain_spec(&self) -> Arc<N::ChainSpec> {
915        self.chain_spec.clone()
916    }
917}
918
919impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
920    fn get_prune_checkpoint(
921        &self,
922        segment: PruneSegment,
923    ) -> ProviderResult<Option<PruneCheckpoint>> {
924        self.provider()?.get_prune_checkpoint(segment)
925    }
926
927    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
928        self.provider()?.get_prune_checkpoints()
929    }
930}
931
932impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
933    fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
934        HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
935    }
936}
937
938impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
939    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
940        self.provider()?.get_metadata(key)
941    }
942}
943
944impl<N> fmt::Debug for ProviderFactory<N>
945where
946    N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
947{
948    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
949        let Self {
950            db,
951            chain_spec,
952            static_file_provider,
953            prune_modes,
954            storage,
955            storage_settings,
956            rocksdb_provider,
957            changeset_cache,
958            runtime,
959            minimum_pruning_distance,
960            read_only_sync,
961        } = self;
962        f.debug_struct("ProviderFactory")
963            .field("db", &db)
964            .field("chain_spec", &chain_spec)
965            .field("static_file_provider", &static_file_provider)
966            .field("prune_modes", &prune_modes)
967            .field("storage", &storage)
968            .field("storage_settings", &*storage_settings.read())
969            .field("rocksdb_provider", &rocksdb_provider)
970            .field("changeset_cache", &changeset_cache)
971            .field("runtime", &runtime)
972            .field("minimum_pruning_distance", &minimum_pruning_distance)
973            .field(
974                "read_only_sync",
975                &read_only_sync.as_ref().map(|s| s.last_synced_txnid.load(Ordering::Relaxed)),
976            )
977            .finish()
978    }
979}
980
981impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
982    fn clone(&self) -> Self {
983        Self {
984            db: self.db.clone(),
985            chain_spec: self.chain_spec.clone(),
986            static_file_provider: self.static_file_provider.clone(),
987            prune_modes: self.prune_modes.clone(),
988            storage: self.storage.clone(),
989            storage_settings: self.storage_settings.clone(),
990            rocksdb_provider: self.rocksdb_provider.clone(),
991            changeset_cache: self.changeset_cache.clone(),
992            runtime: self.runtime.clone(),
993            minimum_pruning_distance: self.minimum_pruning_distance,
994            read_only_sync: self.read_only_sync.clone(),
995        }
996    }
997}
998
999#[cfg(test)]
1000mod tests {
1001    use super::*;
1002    use crate::{
1003        providers::{StaticFileProvider, StaticFileWriter},
1004        test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
1005        BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
1006        TransactionsProvider,
1007    };
1008    use alloy_primitives::{TxNumber, B256};
1009    use assert_matches::assert_matches;
1010    use reth_chainspec::ChainSpecBuilder;
1011    use reth_db::{
1012        mdbx::DatabaseArguments,
1013        test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
1014    };
1015    use reth_db_api::tables;
1016    use reth_primitives_traits::SignerRecoverable;
1017    use reth_prune_types::{PruneMode, PruneModes};
1018    use reth_storage_errors::provider::ProviderError;
1019    use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
1020    use std::{ops::RangeInclusive, sync::Arc};
1021
1022    #[test]
1023    fn common_history_provider() {
1024        let factory = create_test_provider_factory();
1025        let _ = factory.latest();
1026    }
1027
1028    #[test]
1029    fn default_chain_info() {
1030        let factory = create_test_provider_factory();
1031        let provider = factory.provider().unwrap();
1032
1033        let chain_info = provider.chain_info().expect("should be ok");
1034        assert_eq!(chain_info.best_number, 0);
1035        assert_eq!(chain_info.best_hash, B256::ZERO);
1036    }
1037
1038    #[test]
1039    fn provider_flow() {
1040        let factory = create_test_provider_factory();
1041        let provider = factory.provider().unwrap();
1042        provider.block_hash(0).unwrap();
1043        let provider_rw = factory.provider_rw().unwrap();
1044        provider_rw.block_hash(0).unwrap();
1045        provider.block_hash(0).unwrap();
1046    }
1047
1048    #[test]
1049    fn provider_factory_with_database_path() {
1050        let chain_spec = ChainSpecBuilder::mainnet().build();
1051        let (_static_dir, static_dir_path) = create_test_static_files_dir();
1052        let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
1053        let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
1054        let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
1055            _db_tempdir.path(),
1056            Arc::new(chain_spec),
1057            DatabaseArguments::new(Default::default()),
1058            StaticFileProvider::read_write(static_dir_path).unwrap(),
1059            RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
1060            reth_tasks::Runtime::test(),
1061        )
1062        .unwrap();
1063        let provider = factory.provider().unwrap();
1064        provider.block_hash(0).unwrap();
1065        let provider_rw = factory.provider_rw().unwrap();
1066        provider_rw.block_hash(0).unwrap();
1067        provider.block_hash(0).unwrap();
1068    }
1069
1070    #[test]
1071    fn insert_block_with_prune_modes() {
1072        let block = TEST_BLOCK.clone();
1073
1074        {
1075            let factory = create_test_provider_factory();
1076            let provider = factory.provider_rw().unwrap();
1077            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1078            assert_matches!(
1079                provider.transaction_sender(0), Ok(Some(sender))
1080                if sender == block.body().transactions[0].recover_signer().unwrap()
1081            );
1082            assert_matches!(
1083                provider.transaction_id(*block.body().transactions[0].tx_hash()),
1084                Ok(Some(0))
1085            );
1086        }
1087
1088        {
1089            let prune_modes = PruneModes {
1090                sender_recovery: Some(PruneMode::Full),
1091                transaction_lookup: Some(PruneMode::Full),
1092                ..PruneModes::default()
1093            };
1094            // Keep factory alive until provider is dropped to prevent TempDatabase cleanup
1095            let factory = create_test_provider_factory().with_prune_modes(prune_modes);
1096            let provider = factory.provider_rw().unwrap();
1097            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1098            assert_matches!(provider.transaction_sender(0), Ok(None));
1099            assert_matches!(
1100                provider.transaction_id(*block.body().transactions[0].tx_hash()),
1101                Ok(None)
1102            );
1103        }
1104    }
1105
1106    #[test]
1107    fn take_block_transaction_range_recover_senders() {
1108        let mut rng = generators::rng();
1109        let block =
1110            random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
1111
1112        let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
1113        for range in tx_ranges {
1114            let factory = create_test_provider_factory();
1115            let provider = factory.provider_rw().unwrap();
1116
1117            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1118
1119            let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
1120            assert_eq!(
1121                senders,
1122                range
1123                    .clone()
1124                    .map(|tx_number| (
1125                        tx_number,
1126                        block.body().transactions[tx_number as usize].recover_signer().unwrap()
1127                    ))
1128                    .collect::<Vec<_>>()
1129            );
1130
1131            let db_senders = provider.senders_by_tx_range(range);
1132            assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
1133        }
1134    }
1135
1136    #[test]
1137    fn header_sync_gap_lookup() {
1138        let factory = create_test_provider_factory();
1139        let provider = factory.provider_rw().unwrap();
1140
1141        let mut rng = generators::rng();
1142
1143        // Genesis
1144        let checkpoint = 0;
1145        let head = random_header(&mut rng, 0, None);
1146
1147        // Empty database
1148        assert_matches!(
1149            provider.local_tip_header(checkpoint),
1150            Err(ProviderError::HeaderNotFound(block_number))
1151                if block_number.as_number().unwrap() == checkpoint
1152        );
1153
1154        // Checkpoint and no gap
1155        let static_file_provider = provider.static_file_provider();
1156        let mut static_file_writer =
1157            static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
1158        static_file_writer.append_header(head.header(), &head.hash()).unwrap();
1159        static_file_writer.commit().unwrap();
1160        drop(static_file_writer);
1161
1162        let local_head = provider.local_tip_header(checkpoint).unwrap();
1163
1164        assert_eq!(local_head, head);
1165    }
1166}