Skip to main content

reth_provider/providers/database/
mod.rs

1use crate::{
2    providers::{
3        state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
4        StaticFileProvider, StaticFileProviderRWRefMut,
5    },
6    to_range,
7    traits::{BlockSource, ReceiptProvider},
8    BalProvider, BalStoreHandle, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider,
9    DatabaseProviderFactory, EitherWriterDestination, HashedPostStateProvider, HeaderProvider,
10    HeaderSyncGapProvider, InMemoryBalStore, MetadataProvider, ProviderError,
11    PruneCheckpointReader, RocksDBProviderFactory, StageCheckpointReader, StateProviderBox,
12    StaticFileProviderFactory, StaticFileWriter, TransactionVariant, TransactionsProvider,
13};
14use alloy_consensus::transaction::TransactionMeta;
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17use core::fmt;
18use notify::{RecommendedWatcher, RecursiveMode, Watcher};
19use parking_lot::RwLock;
20use reth_chainspec::ChainInfo;
21use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
22use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
23use reth_errors::{RethError, RethResult};
24use reth_node_types::{
25    BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
26};
27use reth_primitives_traits::{RecoveredBlock, SealedHeader};
28use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
29use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
30use reth_static_file_types::StaticFileSegment;
31use reth_storage_api::{
32    BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
33    NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
34};
35use reth_storage_errors::provider::ProviderResult;
36use reth_trie::HashedPostState;
37use reth_trie_db::ChangesetCache;
38use revm_database::BundleState;
39use std::{
40    ops::{RangeBounds, RangeInclusive},
41    path::Path,
42    sync::{
43        atomic::{AtomicU64, Ordering},
44        Arc, Mutex,
45    },
46};
47use tracing::{info, instrument, trace, warn};
48
49mod provider;
50pub use provider::{
51    CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
52};
53
54use super::ProviderNodeTypes;
55use reth_trie::KeccakKeyHasher;
56
57mod builder;
58pub use builder::{ProviderFactoryBuilder, ReadOnlyConfig};
59
60mod metrics;
61
62mod chain;
63pub use chain::*;
64
65/// Sync state for read-only [`ProviderFactory`] instances.
66struct ReadOnlySyncState {
67    /// Last MDBX txn ID we synced `RocksDB` secondary / static file indexes to.
68    last_synced_txnid: AtomicU64,
69    /// Serializes the slow-path catch-up (`RocksDB` + static file re-init).
70    sync_lock: Mutex<()>,
71}
72
73/// A common provider that fetches data from a database or static file.
74///
75/// This provider implements most provider or provider factory traits.
76pub struct ProviderFactory<N: NodeTypesWithDB> {
77    /// Database instance
78    db: N::DB,
79    /// Chain spec
80    chain_spec: Arc<N::ChainSpec>,
81    /// Static File Provider
82    static_file_provider: StaticFileProvider<N::Primitives>,
83    /// Optional pruning configuration
84    prune_modes: PruneModes,
85    /// The node storage handler.
86    storage: Arc<N::Storage>,
87    /// Storage configuration settings for this node
88    storage_settings: Arc<RwLock<StorageSettings>>,
89    /// `RocksDB` provider
90    rocksdb_provider: RocksDBProvider,
91    /// Changeset cache for trie unwinding
92    changeset_cache: ChangesetCache,
93    /// Store for block access lists.
94    bal_store: BalStoreHandle,
95    /// Task runtime for spawning parallel I/O work.
96    runtime: reth_tasks::Runtime,
97    /// Minimum distance from tip required before pruning can occur.
98    minimum_pruning_distance: u64,
99    /// State for on-demand syncing of `RocksDB` secondary and static file indexes.
100    ///
101    /// Only set for read-only factories. Can be disabled if there is no concurrent read-write
102    /// factory writing to the database (e.g as part of a running reth node).
103    read_only_sync: Option<Arc<ReadOnlySyncState>>,
104}
105
106impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
107    /// Instantiates the builder for this type
108    pub fn builder() -> ProviderFactoryBuilder<N> {
109        ProviderFactoryBuilder::default()
110    }
111}
112
113impl<N: ProviderNodeTypes> ProviderFactory<N> {
114    /// Create new database provider factory.
115    ///
116    /// The storage backends used by the produced factory MAY be inconsistent.
117    /// It is recommended to call [`Self::check_consistency`] after
118    /// creation to ensure consistency between the database and static files.
119    /// If the function returns unwind targets, the caller MUST unwind the
120    /// inner database to the minimum of the two targets to ensure consistency.
121    pub fn new(
122        db: N::DB,
123        chain_spec: Arc<N::ChainSpec>,
124        static_file_provider: StaticFileProvider<N::Primitives>,
125        rocksdb_provider: RocksDBProvider,
126        runtime: reth_tasks::Runtime,
127    ) -> ProviderResult<Self> {
128        // Load storage settings from database at init time. Creates a temporary provider
129        // to read persisted settings, falling back to legacy defaults if none exist.
130        //
131        // Both factory and all providers it creates should share these cached settings.
132        let legacy_settings = StorageSettings::v1();
133        let storage_settings = DatabaseProvider::<_, N>::new(
134            db.tx()?,
135            chain_spec.clone(),
136            static_file_provider.clone(),
137            Default::default(),
138            Default::default(),
139            Arc::new(RwLock::new(legacy_settings)),
140            rocksdb_provider.clone(),
141            ChangesetCache::new(),
142            runtime.clone(),
143            db.path(),
144        )
145        .storage_settings()?
146        .unwrap_or(legacy_settings);
147
148        Ok(Self {
149            db,
150            chain_spec,
151            static_file_provider,
152            prune_modes: PruneModes::default(),
153            storage: Default::default(),
154            storage_settings: Arc::new(RwLock::new(storage_settings)),
155            rocksdb_provider,
156            changeset_cache: ChangesetCache::new(),
157            bal_store: BalStoreHandle::new(InMemoryBalStore::default()),
158            runtime,
159            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
160            read_only_sync: None,
161        })
162    }
163
164    /// Create new database provider factory and perform consistency checks.
165    ///
166    /// This will call [`Self::check_consistency`] internally and return
167    /// [`ProviderError::MustUnwind`] if inconsistencies are found. It may also
168    /// return any [`ProviderError`] that [`Self::new`] may return, or that are
169    /// encountered during consistency checks.
170    pub fn new_checked(
171        db: N::DB,
172        chain_spec: Arc<N::ChainSpec>,
173        static_file_provider: StaticFileProvider<N::Primitives>,
174        rocksdb_provider: RocksDBProvider,
175        runtime: reth_tasks::Runtime,
176    ) -> ProviderResult<Self> {
177        Self::new(db, chain_spec, static_file_provider, rocksdb_provider, runtime)
178            .and_then(Self::assert_consistent)
179    }
180}
181
182impl<N: NodeTypesWithDB> ProviderFactory<N> {
183    /// Sets the pruning configuration for an existing [`ProviderFactory`].
184    pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
185        self.prune_modes = prune_modes;
186        self
187    }
188
189    /// Sets the BAL store for an existing [`ProviderFactory`].
190    pub fn with_bal_store(mut self, bal_store: BalStoreHandle) -> Self {
191        self.bal_store = bal_store;
192        self
193    }
194
195    /// Sets the changeset cache for an existing [`ProviderFactory`].
196    pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
197        self.changeset_cache = changeset_cache;
198        self
199    }
200
201    /// Sets the minimum pruning distance for an existing [`ProviderFactory`].
202    ///
203    /// This controls the minimum distance from tip required before pruning can occur.
204    /// The default is [`MINIMUM_UNWIND_SAFE_DISTANCE`].
205    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
206        self.minimum_pruning_distance = distance;
207        self
208    }
209
210    /// Enables on-demand syncing of `RocksDB` secondary and static file indexes for read-only
211    /// factories. Initializes the tracker to the current MDBX txn ID.
212    ///
213    /// Should be used for read-only factories that are running concurrently to a reth node writing
214    /// new data to the database. Would effectively be a no-op if database directory is unchanged.
215    pub fn with_read_only_sync(mut self, watch: bool) -> Self
216    where
217        N::DB: Database,
218    {
219        // Initialize to 0 so the first `sync_providers_if_needed` call always
220        // triggers a RocksDB/static-file catch-up, regardless of what MDBX txnid
221        // the database was at when we opened it.
222        let state = Arc::new(ReadOnlySyncState {
223            last_synced_txnid: AtomicU64::new(0),
224            sync_lock: Mutex::new(()),
225        });
226        self.read_only_sync = Some(state);
227
228        if watch {
229            self.watch_db_directory();
230        }
231        self
232    }
233
234    /// Watches the MDBX data directory for changes and eagerly syncs `RocksDB` secondary and
235    /// static file indexes when modifications are detected.
236    fn watch_db_directory(&self)
237    where
238        N::DB: Database,
239    {
240        let factory = self.clone();
241        let db_path = self.db.path();
242        reth_tasks::spawn_os_thread("ro-sync", move || {
243            let (tx, rx) = std::sync::mpsc::channel();
244            let mut watcher = RecommendedWatcher::new(
245                move |res| {
246                    let _ = tx.send(res);
247                },
248                notify::Config::default(),
249            )
250            .expect("failed to create watcher");
251
252            watcher
253                .watch(&db_path, RecursiveMode::NonRecursive)
254                .expect("failed to watch MDBX path");
255
256            while let Ok(res) = rx.recv() {
257                match res {
258                    Ok(event) => {
259                        if !matches!(
260                            event.kind,
261                            notify::EventKind::Modify(_) | notify::EventKind::Create(_)
262                        ) {
263                            continue;
264                        }
265
266                        if let Err(err) = factory.sync_providers_if_needed() {
267                            warn!(target: "reth::provider", %err, "background ro-sync failed");
268                        }
269                    }
270                    Err(err) => {
271                        warn!(target: "reth::provider", ?err, "MDBX directory watcher error");
272                    }
273                }
274            }
275        });
276    }
277
278    /// For read-only factories, checks whether the MDBX committed txn ID has advanced since the
279    /// last sync and, if so, catches up the `RocksDB` secondary instance and re-initializes the
280    /// static file index.
281    ///
282    /// No-op for read-write factories.
283    pub fn sync_providers_if_needed(&self) -> ProviderResult<()> {
284        let Some(sync_state) = &self.read_only_sync else { return Ok(()) };
285        let current_txnid = self.db.last_txnid().unwrap_or(0);
286
287        // Fast path: no contention when nothing changed.
288        if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
289            return Ok(());
290        }
291
292        // Slow path: serialize the actual catch-up I/O.
293        let _guard = sync_state.sync_lock.lock().unwrap_or_else(|e| e.into_inner());
294
295        // Double-check after acquiring the lock — another thread may have already synced.
296        if current_txnid == sync_state.last_synced_txnid.load(Ordering::Relaxed) {
297            return Ok(());
298        }
299
300        self.rocksdb_provider.try_catch_up_with_primary()?;
301        self.static_file_provider.initialize_index()?;
302        sync_state.last_synced_txnid.store(current_txnid, Ordering::Relaxed);
303        Ok(())
304    }
305
306    /// Returns reference to the underlying database.
307    pub const fn db_ref(&self) -> &N::DB {
308        &self.db
309    }
310
311    #[cfg(any(test, feature = "test-utils"))]
312    /// Consumes Self and returns DB
313    pub fn into_db(self) -> N::DB {
314        self.db
315    }
316}
317
318impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
319    fn cached_storage_settings(&self) -> StorageSettings {
320        *self.storage_settings.read()
321    }
322
323    fn set_storage_settings_cache(&self, settings: StorageSettings) {
324        *self.storage_settings.write() = settings;
325    }
326}
327
328impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
329    fn rocksdb_provider(&self) -> RocksDBProvider {
330        self.rocksdb_provider.clone()
331    }
332
333    fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
334        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
335    }
336
337    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
338        unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
339    }
340}
341
342impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
343    /// Create new database provider by passing a path. [`ProviderFactory`] will own the database
344    /// instance.
345    pub fn new_with_database_path<P: AsRef<Path>>(
346        path: P,
347        chain_spec: Arc<N::ChainSpec>,
348        args: DatabaseArguments,
349        static_file_provider: StaticFileProvider<N::Primitives>,
350        rocksdb_provider: RocksDBProvider,
351        runtime: reth_tasks::Runtime,
352    ) -> RethResult<Self> {
353        Self::new(
354            init_db(path, args).map_err(RethError::msg)?,
355            chain_spec,
356            static_file_provider,
357            rocksdb_provider,
358            runtime,
359        )
360        .map_err(RethError::Provider)
361    }
362}
363
364impl<N: ProviderNodeTypes> ProviderFactory<N> {
365    /// Returns a provider with a created `DbTx` inside, which allows fetching data from the
366    /// database using different types of providers. Example: [`HeaderProvider`]
367    /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
368    ///
369    /// This sets the [`PruneModes`] to [`None`], because they should only be relevant for writing
370    /// data.
371    #[track_caller]
372    pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
373        let db_tx = self.db.tx()?;
374
375        // Sync providers after opening the database transaction to make
376        // sure that no data is pruned from rocksdb or static files.
377        //
378        // Reorg logic ensures that no data is pruned from rocksdb or static files while there is an
379        // mdbx transaction open that might rely on this data.
380        self.sync_providers_if_needed()?;
381
382        Ok(DatabaseProvider::new(
383            db_tx,
384            self.chain_spec.clone(),
385            self.static_file_provider.clone(),
386            self.prune_modes.clone(),
387            self.storage.clone(),
388            self.storage_settings.clone(),
389            self.rocksdb_provider.clone(),
390            self.changeset_cache.clone(),
391            self.runtime.clone(),
392            self.db.path(),
393        )
394        .with_minimum_pruning_distance(self.minimum_pruning_distance))
395    }
396
397    /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
398    /// data from the database using different types of providers. Example: [`HeaderProvider`]
399    /// [`BlockHashReader`].  This may fail if the inner read/write database transaction fails to
400    /// open.
401    #[track_caller]
402    pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
403        Ok(DatabaseProviderRW(
404            DatabaseProvider::new_rw(
405                self.db.tx_mut()?,
406                self.chain_spec.clone(),
407                self.static_file_provider.clone(),
408                self.prune_modes.clone(),
409                self.storage.clone(),
410                self.storage_settings.clone(),
411                self.rocksdb_provider.clone(),
412                self.changeset_cache.clone(),
413                self.runtime.clone(),
414                self.db.path(),
415            )
416            .with_reader_txn_tracker(self.db.clone())
417            .with_minimum_pruning_distance(self.minimum_pruning_distance),
418        ))
419    }
420
421    /// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
422    /// Uses unwind commit order (MDBX first, then `RocksDB`, then static files) to allow
423    /// recovery by truncating static files on restart if interrupted.
424    ///
425    /// Unwind commits may wait for pre-existing readers to drain before finishing later
426    /// cross-store steps. Drop any long-lived read providers before committing this provider.
427    #[track_caller]
428    pub fn unwind_provider_rw(
429        &self,
430    ) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
431        Ok(DatabaseProvider::new_unwind_rw(
432            self.db.tx_mut()?,
433            self.chain_spec.clone(),
434            self.static_file_provider.clone(),
435            self.prune_modes.clone(),
436            self.storage.clone(),
437            self.storage_settings.clone(),
438            self.rocksdb_provider.clone(),
439            self.changeset_cache.clone(),
440            self.runtime.clone(),
441            self.db.path(),
442        )
443        .with_reader_txn_tracker(self.db.clone())
444        .with_minimum_pruning_distance(self.minimum_pruning_distance))
445    }
446
447    /// State provider for latest block
448    #[track_caller]
449    pub fn latest(&self) -> ProviderResult<StateProviderBox> {
450        trace!(target: "providers::db", "Returning latest state provider");
451        Ok(Box::new(LatestStateProvider::new(self.database_provider_ro()?)))
452    }
453
454    /// Storage provider for state at that given block
455    pub fn history_by_block_number(
456        &self,
457        block_number: BlockNumber,
458    ) -> ProviderResult<StateProviderBox> {
459        let state_provider = self.provider()?.try_into_history_at_block(block_number)?;
460        trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
461        Ok(state_provider)
462    }
463
464    /// Storage provider for state at that given block hash
465    pub fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult<StateProviderBox> {
466        let provider = self.provider()?;
467
468        let block_number = provider
469            .block_number(block_hash)?
470            .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
471
472        let state_provider = provider.try_into_history_at_block(block_number)?;
473        trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash");
474        Ok(state_provider)
475    }
476
477    /// Asserts that the static files and database are consistent. If not,
478    /// returns [`ProviderError::MustUnwind`] with the appropriate unwind
479    /// target. May also return any [`ProviderError`] that
480    /// [`Self::check_consistency`] may return.
481    pub fn assert_consistent(self) -> ProviderResult<Self> {
482        let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?;
483
484        let source = match (rocksdb_unwind, static_file_unwind) {
485            (None, None) => return Ok(self),
486            (Some(_), Some(_)) => "RocksDB and Static Files",
487            (Some(_), None) => "RocksDB",
488            (None, Some(_)) => "Static Files",
489        };
490
491        Err(ProviderError::MustUnwind {
492            data_source: source,
493            unwind_to: rocksdb_unwind
494                .into_iter()
495                .chain(static_file_unwind)
496                .min()
497                .expect("at least one unwind target must be Some"),
498        })
499    }
500
501    /// Checks the consistency between the static files and the database. This
502    /// may result in static files being pruned or otherwise healed to ensure
503    /// consistency. I.e. this MAY result in writes to the static files.
504    #[instrument(err, skip(self))]
505    pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
506        let provider_ro = self
507            .database_provider_ro()?
508            // Healing can run long-lived read transactions (e.g., iterating changesets
509            // over millions of blocks). Disable the default timeout so MDBX doesn't
510            // kill the transaction mid-heal, which causes a crash loop on startup.
511            .disable_long_read_transaction_safety();
512
513        // Step 1: heal file-level inconsistencies (no pruning)
514        self.static_file_provider().check_file_consistency(&provider_ro)?;
515
516        // Step 2: RocksDB consistency check (needs static files tx data)
517        let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?;
518
519        // Step 3: Static file checkpoint consistency (may prune)
520        let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map(
521            |target| match target {
522                PipelineTarget::Unwind(block) => block,
523                PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
524            },
525        );
526
527        // Step 4: Heal finalized/safe block numbers that may be ahead of the
528        // highest header on nodes coming from <=1.10.2.
529        //
530        // Unwinds already set it to the target block.
531        if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
532            self.heal_chain_state_block_numbers(&provider_ro)?;
533        }
534
535        Ok((rocksdb_unwind, static_file_unwind))
536    }
537
538    /// If the stored finalized or safe block number is ahead of the highest
539    /// header, resets it to the highest header.
540    fn heal_chain_state_block_numbers(
541        &self,
542        provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
543    ) -> ProviderResult<()> {
544        let highest_header = self.last_block_number()?;
545
546        let finalized = provider_ro.last_finalized_block_number()?;
547        let safe = provider_ro.last_safe_block_number()?;
548
549        if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
550        {
551            return Ok(());
552        }
553
554        let provider_rw = self.provider_rw()?;
555
556        if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
557            info!(
558                target: "providers::db",
559                finalized,
560                highest_header,
561                "Healing finalized block number",
562            );
563            provider_rw.save_finalized_block_number(highest_header)?;
564        }
565
566        if let Some(safe) = safe.filter(|&s| s > highest_header) {
567            info!(
568                target: "providers::db",
569                safe,
570                highest_header,
571                "Healing safe block number",
572            );
573            provider_rw.save_safe_block_number(highest_header)?;
574        }
575
576        provider_rw.commit()?;
577
578        Ok(())
579    }
580
581    /// Returns a static file provider. For read-only instances, this will also invoke
582    /// [`Self::sync_providers_if_needed`] to make sure that the static file provider is up to date.
583    pub fn caught_up_static_file_provider(
584        &self,
585    ) -> ProviderResult<StaticFileProvider<N::Primitives>> {
586        self.sync_providers_if_needed()?;
587        Ok(self.static_file_provider.clone())
588    }
589}
590
591impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
592    type Primitives = N::Primitives;
593}
594
595impl<N: NodeTypesWithDB> BalProvider for ProviderFactory<N> {
596    fn bal_store(&self) -> &BalStoreHandle {
597        &self.bal_store
598    }
599}
600
601impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
602    type DB = N::DB;
603    type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
604    type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut, N>;
605
606    fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
607        self.provider()
608    }
609
610    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
611        self.provider_rw().map(|provider| provider.0)
612    }
613}
614
615impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
616    /// Returns static file provider
617    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
618        self.static_file_provider.clone()
619    }
620
621    fn get_static_file_writer(
622        &self,
623        block: BlockNumber,
624        segment: StaticFileSegment,
625    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
626        self.static_file_provider.get_writer(block, segment)
627    }
628}
629
630impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
631    type Header = HeaderTy<N>;
632    fn local_tip_header(
633        &self,
634        highest_uninterrupted_block: BlockNumber,
635    ) -> ProviderResult<SealedHeader<Self::Header>> {
636        self.provider()?.local_tip_header(highest_uninterrupted_block)
637    }
638}
639
640impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
641    type Header = HeaderTy<N>;
642
643    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
644        self.provider()?.header(block_hash)
645    }
646
647    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
648        self.caught_up_static_file_provider()?.header_by_number(num)
649    }
650
651    fn headers_range(
652        &self,
653        range: impl RangeBounds<BlockNumber>,
654    ) -> ProviderResult<Vec<Self::Header>> {
655        self.caught_up_static_file_provider()?.headers_range(range)
656    }
657
658    fn sealed_header(
659        &self,
660        number: BlockNumber,
661    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
662        self.caught_up_static_file_provider()?.sealed_header(number)
663    }
664
665    fn sealed_headers_range(
666        &self,
667        range: impl RangeBounds<BlockNumber>,
668    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
669        self.caught_up_static_file_provider()?.sealed_headers_range(range)
670    }
671
672    fn sealed_headers_while(
673        &self,
674        range: impl RangeBounds<BlockNumber>,
675        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
676    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
677        self.caught_up_static_file_provider()?.sealed_headers_while(range, predicate)
678    }
679}
680
681impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
682    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
683        self.caught_up_static_file_provider()?.block_hash(number)
684    }
685
686    fn canonical_hashes_range(
687        &self,
688        start: BlockNumber,
689        end: BlockNumber,
690    ) -> ProviderResult<Vec<B256>> {
691        self.caught_up_static_file_provider()?.canonical_hashes_range(start, end)
692    }
693}
694
695impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
696    fn chain_info(&self) -> ProviderResult<ChainInfo> {
697        self.provider()?.chain_info()
698    }
699
700    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
701        self.provider()?.best_block_number()
702    }
703
704    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
705        self.caught_up_static_file_provider()?.last_block_number()
706    }
707
708    fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
709        // earliest history height tracks the lowest block number that has __not__ been expired, in
710        // other words, the first/earliest available block.
711        Ok(self.caught_up_static_file_provider()?.earliest_history_height())
712    }
713
714    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
715        self.provider()?.block_number(hash)
716    }
717}
718
719impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
720    type Block = BlockTy<N>;
721
722    fn find_block_by_hash(
723        &self,
724        hash: B256,
725        source: BlockSource,
726    ) -> ProviderResult<Option<Self::Block>> {
727        self.provider()?.find_block_by_hash(hash, source)
728    }
729
730    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
731        self.provider()?.block(id)
732    }
733
734    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
735        self.provider()?.pending_block()
736    }
737
738    fn pending_block_and_receipts(
739        &self,
740    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
741        self.provider()?.pending_block_and_receipts()
742    }
743
744    fn recovered_block(
745        &self,
746        id: BlockHashOrNumber,
747        transaction_kind: TransactionVariant,
748    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
749        self.provider()?.recovered_block(id, transaction_kind)
750    }
751
752    fn sealed_block_with_senders(
753        &self,
754        id: BlockHashOrNumber,
755        transaction_kind: TransactionVariant,
756    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
757        self.provider()?.sealed_block_with_senders(id, transaction_kind)
758    }
759
760    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
761        self.provider()?.block_range(range)
762    }
763
764    fn block_with_senders_range(
765        &self,
766        range: RangeInclusive<BlockNumber>,
767    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
768        self.provider()?.block_with_senders_range(range)
769    }
770
771    fn recovered_block_range(
772        &self,
773        range: RangeInclusive<BlockNumber>,
774    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
775        self.provider()?.recovered_block_range(range)
776    }
777
778    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
779        self.provider()?.block_by_transaction_id(id)
780    }
781}
782
783impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
784    type Transaction = TxTy<N>;
785
786    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
787        self.provider()?.transaction_id(tx_hash)
788    }
789
790    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
791        self.caught_up_static_file_provider()?.transaction_by_id(id)
792    }
793
794    fn transaction_by_id_unhashed(
795        &self,
796        id: TxNumber,
797    ) -> ProviderResult<Option<Self::Transaction>> {
798        self.caught_up_static_file_provider()?.transaction_by_id_unhashed(id)
799    }
800
801    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
802        self.provider()?.transaction_by_hash(hash)
803    }
804
805    fn transaction_by_hash_with_meta(
806        &self,
807        tx_hash: TxHash,
808    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
809        self.provider()?.transaction_by_hash_with_meta(tx_hash)
810    }
811
812    fn transactions_by_block(
813        &self,
814        id: BlockHashOrNumber,
815    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
816        self.provider()?.transactions_by_block(id)
817    }
818
819    fn transactions_by_block_range(
820        &self,
821        range: impl RangeBounds<BlockNumber>,
822    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
823        self.provider()?.transactions_by_block_range(range)
824    }
825
826    fn transactions_by_tx_range(
827        &self,
828        range: impl RangeBounds<TxNumber>,
829    ) -> ProviderResult<Vec<Self::Transaction>> {
830        self.caught_up_static_file_provider()?.transactions_by_tx_range(range)
831    }
832
833    fn senders_by_tx_range(
834        &self,
835        range: impl RangeBounds<TxNumber>,
836    ) -> ProviderResult<Vec<Address>> {
837        if EitherWriterDestination::senders(self).is_static_file() {
838            self.caught_up_static_file_provider()?.senders_by_tx_range(range)
839        } else {
840            self.provider()?.senders_by_tx_range(range)
841        }
842    }
843
844    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
845        if EitherWriterDestination::senders(self).is_static_file() {
846            self.caught_up_static_file_provider()?.transaction_sender(id)
847        } else {
848            self.provider()?.transaction_sender(id)
849        }
850    }
851}
852
853impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
854    type Receipt = ReceiptTy<N>;
855
856    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
857        self.caught_up_static_file_provider()?.get_with_static_file_or_database(
858            StaticFileSegment::Receipts,
859            id,
860            |static_file| static_file.receipt(id),
861            || self.provider()?.receipt(id),
862        )
863    }
864
865    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
866        self.provider()?.receipt_by_hash(hash)
867    }
868
869    fn receipts_by_block(
870        &self,
871        block: BlockHashOrNumber,
872    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
873        self.provider()?.receipts_by_block(block)
874    }
875
876    fn receipts_by_tx_range(
877        &self,
878        range: impl RangeBounds<TxNumber>,
879    ) -> ProviderResult<Vec<Self::Receipt>> {
880        self.caught_up_static_file_provider()?.get_range_with_static_file_or_database(
881            StaticFileSegment::Receipts,
882            to_range(range),
883            |static_file, range, _| static_file.receipts_by_tx_range(range),
884            |range, _| self.provider()?.receipts_by_tx_range(range),
885            |_| true,
886        )
887    }
888
889    fn receipts_by_block_range(
890        &self,
891        block_range: RangeInclusive<BlockNumber>,
892    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
893        self.provider()?.receipts_by_block_range(block_range)
894    }
895}
896
897impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ProviderFactory<N> {
898    fn block_body_indices(
899        &self,
900        number: BlockNumber,
901    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
902        self.provider()?.block_body_indices(number)
903    }
904
905    fn block_body_indices_range(
906        &self,
907        range: RangeInclusive<BlockNumber>,
908    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
909        self.provider()?.block_body_indices_range(range)
910    }
911}
912
913impl<N: ProviderNodeTypes> StageCheckpointReader for ProviderFactory<N> {
914    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
915        self.provider()?.get_stage_checkpoint(id)
916    }
917
918    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
919        self.provider()?.get_stage_checkpoint_progress(id)
920    }
921    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
922        self.provider()?.get_all_checkpoints()
923    }
924}
925
926impl<N: NodeTypesWithDB> ChainSpecProvider for ProviderFactory<N> {
927    type ChainSpec = N::ChainSpec;
928
929    fn chain_spec(&self) -> Arc<N::ChainSpec> {
930        self.chain_spec.clone()
931    }
932}
933
934impl<N: ProviderNodeTypes> PruneCheckpointReader for ProviderFactory<N> {
935    fn get_prune_checkpoint(
936        &self,
937        segment: PruneSegment,
938    ) -> ProviderResult<Option<PruneCheckpoint>> {
939        self.provider()?.get_prune_checkpoint(segment)
940    }
941
942    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
943        self.provider()?.get_prune_checkpoints()
944    }
945}
946
947impl<N: ProviderNodeTypes> HashedPostStateProvider for ProviderFactory<N> {
948    fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState {
949        HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
950    }
951}
952
953impl<N: ProviderNodeTypes> MetadataProvider for ProviderFactory<N> {
954    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
955        self.provider()?.get_metadata(key)
956    }
957}
958
959impl<N> fmt::Debug for ProviderFactory<N>
960where
961    N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
962{
963    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
964        let Self {
965            db,
966            chain_spec,
967            static_file_provider,
968            prune_modes,
969            storage,
970            storage_settings,
971            rocksdb_provider,
972            changeset_cache,
973            bal_store,
974            runtime,
975            minimum_pruning_distance,
976            read_only_sync,
977        } = self;
978        f.debug_struct("ProviderFactory")
979            .field("db", &db)
980            .field("chain_spec", &chain_spec)
981            .field("static_file_provider", &static_file_provider)
982            .field("prune_modes", &prune_modes)
983            .field("storage", &storage)
984            .field("storage_settings", &*storage_settings.read())
985            .field("rocksdb_provider", &rocksdb_provider)
986            .field("changeset_cache", &changeset_cache)
987            .field("bal_store", &bal_store)
988            .field("runtime", &runtime)
989            .field("minimum_pruning_distance", &minimum_pruning_distance)
990            .field(
991                "read_only_sync",
992                &read_only_sync.as_ref().map(|s| s.last_synced_txnid.load(Ordering::Relaxed)),
993            )
994            .finish()
995    }
996}
997
998impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
999    fn clone(&self) -> Self {
1000        Self {
1001            db: self.db.clone(),
1002            chain_spec: self.chain_spec.clone(),
1003            static_file_provider: self.static_file_provider.clone(),
1004            prune_modes: self.prune_modes.clone(),
1005            storage: self.storage.clone(),
1006            storage_settings: self.storage_settings.clone(),
1007            rocksdb_provider: self.rocksdb_provider.clone(),
1008            changeset_cache: self.changeset_cache.clone(),
1009            bal_store: self.bal_store.clone(),
1010            runtime: self.runtime.clone(),
1011            minimum_pruning_distance: self.minimum_pruning_distance,
1012            read_only_sync: self.read_only_sync.clone(),
1013        }
1014    }
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use super::*;
1020    use crate::{
1021        providers::{StaticFileProvider, StaticFileWriter},
1022        test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
1023        BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
1024        TransactionsProvider,
1025    };
1026    use alloy_primitives::{TxNumber, B256};
1027    use assert_matches::assert_matches;
1028    use reth_chainspec::ChainSpecBuilder;
1029    use reth_db::{
1030        mdbx::DatabaseArguments,
1031        test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
1032    };
1033    use reth_db_api::tables;
1034    use reth_primitives_traits::SignerRecoverable;
1035    use reth_prune_types::{PruneMode, PruneModes};
1036    use reth_storage_errors::provider::ProviderError;
1037    use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
1038    use std::{ops::RangeInclusive, sync::Arc};
1039
1040    #[test]
1041    fn common_history_provider() {
1042        let factory = create_test_provider_factory();
1043        let _ = factory.latest();
1044    }
1045
1046    #[test]
1047    fn default_chain_info() {
1048        let factory = create_test_provider_factory();
1049        let provider = factory.provider().unwrap();
1050
1051        let chain_info = provider.chain_info().expect("should be ok");
1052        assert_eq!(chain_info.best_number, 0);
1053        assert_eq!(chain_info.best_hash, B256::ZERO);
1054    }
1055
1056    #[test]
1057    fn provider_flow() {
1058        let factory = create_test_provider_factory();
1059        let provider = factory.provider().unwrap();
1060        provider.block_hash(0).unwrap();
1061        let provider_rw = factory.provider_rw().unwrap();
1062        provider_rw.block_hash(0).unwrap();
1063        provider.block_hash(0).unwrap();
1064    }
1065
1066    #[test]
1067    fn provider_factory_with_database_path() {
1068        let chain_spec = ChainSpecBuilder::mainnet().build();
1069        let (_static_dir, static_dir_path) = create_test_static_files_dir();
1070        let (_rocksdb_dir, rocksdb_path) = create_test_rocksdb_dir();
1071        let _db_tempdir = tempfile::TempDir::new().expect(ERROR_TEMPDIR);
1072        let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
1073            _db_tempdir.path(),
1074            Arc::new(chain_spec),
1075            DatabaseArguments::new(Default::default()),
1076            StaticFileProvider::read_write(static_dir_path).unwrap(),
1077            RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
1078            reth_tasks::Runtime::test(),
1079        )
1080        .unwrap();
1081        let provider = factory.provider().unwrap();
1082        provider.block_hash(0).unwrap();
1083        let provider_rw = factory.provider_rw().unwrap();
1084        provider_rw.block_hash(0).unwrap();
1085        provider.block_hash(0).unwrap();
1086    }
1087
1088    #[test]
1089    fn insert_block_with_prune_modes() {
1090        let block = TEST_BLOCK.clone();
1091
1092        {
1093            let factory = create_test_provider_factory();
1094            let provider = factory.provider_rw().unwrap();
1095            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1096            assert_matches!(
1097                provider.transaction_sender(0), Ok(Some(sender))
1098                if sender == block.body().transactions[0].recover_signer().unwrap()
1099            );
1100            assert_matches!(
1101                provider.transaction_id(*block.body().transactions[0].tx_hash()),
1102                Ok(Some(0))
1103            );
1104        }
1105
1106        {
1107            let prune_modes = PruneModes {
1108                sender_recovery: Some(PruneMode::Full),
1109                transaction_lookup: Some(PruneMode::Full),
1110                ..PruneModes::default()
1111            };
1112            // Keep factory alive until provider is dropped to prevent TempDatabase cleanup
1113            let factory = create_test_provider_factory().with_prune_modes(prune_modes);
1114            let provider = factory.provider_rw().unwrap();
1115            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1116            assert_matches!(provider.transaction_sender(0), Ok(None));
1117            assert_matches!(
1118                provider.transaction_id(*block.body().transactions[0].tx_hash()),
1119                Ok(None)
1120            );
1121        }
1122    }
1123
1124    #[test]
1125    fn take_block_transaction_range_recover_senders() {
1126        let mut rng = generators::rng();
1127        let block =
1128            random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() });
1129
1130        let tx_ranges: Vec<RangeInclusive<TxNumber>> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2];
1131        for range in tx_ranges {
1132            let factory = create_test_provider_factory();
1133            let provider = factory.provider_rw().unwrap();
1134
1135            assert_matches!(provider.insert_block(&block.clone().try_recover().unwrap()), Ok(_));
1136
1137            let senders = provider.take::<tables::TransactionSenders>(range.clone()).unwrap();
1138            assert_eq!(
1139                senders,
1140                range
1141                    .clone()
1142                    .map(|tx_number| (
1143                        tx_number,
1144                        block.body().transactions[tx_number as usize].recover_signer().unwrap()
1145                    ))
1146                    .collect::<Vec<_>>()
1147            );
1148
1149            let db_senders = provider.senders_by_tx_range(range);
1150            assert!(matches!(db_senders, Ok(ref v) if v.is_empty()));
1151        }
1152    }
1153
1154    #[test]
1155    fn header_sync_gap_lookup() {
1156        let factory = create_test_provider_factory();
1157        let provider = factory.provider_rw().unwrap();
1158
1159        let mut rng = generators::rng();
1160
1161        // Genesis
1162        let checkpoint = 0;
1163        let head = random_header(&mut rng, 0, None);
1164
1165        // Empty database
1166        assert_matches!(
1167            provider.local_tip_header(checkpoint),
1168            Err(ProviderError::HeaderNotFound(block_number))
1169                if block_number.as_number().unwrap() == checkpoint
1170        );
1171
1172        // Checkpoint and no gap
1173        let static_file_provider = provider.static_file_provider();
1174        let mut static_file_writer =
1175            static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
1176        static_file_writer.append_header(head.header(), &head.hash()).unwrap();
1177        static_file_writer.commit().unwrap();
1178        drop(static_file_writer);
1179
1180        let local_head = provider.local_tip_header(checkpoint).unwrap();
1181
1182        assert_eq!(local_head, head);
1183    }
1184}