Skip to main content

reth_provider/traits/
rocksdb_provider.rs

1use crate::{
2    either_writer::{RawRocksDBBatch, RocksBatchArg, RocksDBRefArg},
3    providers::RocksDBProvider,
4};
5use reth_storage_api::StorageSettingsCache;
6use reth_storage_errors::provider::ProviderResult;
7
8/// `RocksDB` provider factory.
9///
10/// This trait provides access to the `RocksDB` provider
11pub trait RocksDBProviderFactory {
12    /// Returns the `RocksDB` provider.
13    fn rocksdb_provider(&self) -> RocksDBProvider;
14
15    /// Adds a pending `RocksDB` batch to be committed when this provider is committed.
16    ///
17    /// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
18    /// commits, ensuring atomicity across all storage backends.
19    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
20
21    /// Takes all pending `RocksDB` batches and commits them.
22    ///
23    /// This drains the pending batches from the lock and commits each one using the `RocksDB`
24    /// provider. Can be called before flush to persist `RocksDB` writes independently of the
25    /// full commit path.
26    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>;
27
28    /// Executes a closure with a `RocksDB` point-in-time snapshot for consistent reads.
29    ///
30    /// This helper encapsulates `RocksDB` access for read operations.
31    /// On legacy MDBX-only nodes (where `storage_v2` is false), this skips creating
32    /// the `RocksDB` snapshot entirely, avoiding unnecessary overhead.
33    ///
34    /// Unlike a transaction-based approach, this works in both read-only and read-write
35    /// modes since the snapshot provides a consistent view of the data at the time it
36    /// was created.
37    fn with_rocksdb_snapshot<F, R>(&self, f: F) -> ProviderResult<R>
38    where
39        Self: StorageSettingsCache,
40        F: FnOnce(RocksDBRefArg<'_>) -> ProviderResult<R>,
41    {
42        if self.cached_storage_settings().storage_v2 {
43            let rocksdb = self.rocksdb_provider();
44            let snapshot = rocksdb.snapshot();
45            return f(Some(snapshot));
46        }
47        f(None)
48    }
49
50    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
51    ///
52    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
53    fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
54    where
55        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
56    {
57        let rocksdb = self.rocksdb_provider();
58        let batch = rocksdb.batch();
59        let (result, raw_batch) = f(batch)?;
60        if let Some(b) = raw_batch {
61            self.set_pending_rocksdb_batch(b);
62        }
63        Ok(result)
64    }
65
66    /// Executes a closure with a `RocksDB` batch that auto-commits on threshold.
67    ///
68    /// Unlike [`Self::with_rocksdb_batch`], this uses a batch that automatically commits
69    /// when it exceeds the size threshold, preventing OOM during large bulk writes.
70    /// The consistency check on startup heals any crash between auto-commits.
71    fn with_rocksdb_batch_auto_commit<F, R>(&self, f: F) -> ProviderResult<R>
72    where
73        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
74    {
75        let rocksdb = self.rocksdb_provider();
76        let batch = rocksdb.batch_with_auto_commit();
77        let (result, raw_batch) = f(batch)?;
78        if let Some(b) = raw_batch {
79            self.set_pending_rocksdb_batch(b);
80        }
81        Ok(result)
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use reth_db_api::models::StorageSettings;
89    use std::sync::atomic::{AtomicUsize, Ordering};
90
91    /// Mock `RocksDB` provider that tracks snapshot creation calls.
92    struct MockRocksDBProvider {
93        tx_call_count: AtomicUsize,
94    }
95
96    impl MockRocksDBProvider {
97        const fn new() -> Self {
98            Self { tx_call_count: AtomicUsize::new(0) }
99        }
100
101        fn tx_call_count(&self) -> usize {
102            self.tx_call_count.load(Ordering::SeqCst)
103        }
104
105        fn increment_tx_count(&self) {
106            self.tx_call_count.fetch_add(1, Ordering::SeqCst);
107        }
108    }
109
110    /// Test provider that implements [`RocksDBProviderFactory`] + [`StorageSettingsCache`].
111    struct TestProvider {
112        settings: StorageSettings,
113        mock_rocksdb: MockRocksDBProvider,
114        temp_dir: tempfile::TempDir,
115    }
116
117    impl TestProvider {
118        fn new(settings: StorageSettings) -> Self {
119            Self {
120                settings,
121                mock_rocksdb: MockRocksDBProvider::new(),
122                temp_dir: tempfile::TempDir::new().unwrap(),
123            }
124        }
125
126        fn tx_call_count(&self) -> usize {
127            self.mock_rocksdb.tx_call_count()
128        }
129    }
130
131    impl StorageSettingsCache for TestProvider {
132        fn cached_storage_settings(&self) -> StorageSettings {
133            self.settings
134        }
135
136        fn set_storage_settings_cache(&self, _settings: StorageSettings) {}
137    }
138
139    impl RocksDBProviderFactory for TestProvider {
140        fn rocksdb_provider(&self) -> RocksDBProvider {
141            self.mock_rocksdb.increment_tx_count();
142            RocksDBProvider::new(self.temp_dir.path()).unwrap()
143        }
144
145        fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
146
147        fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
148            Ok(())
149        }
150    }
151
152    #[test]
153    fn test_legacy_settings_skip_rocksdb_snapshot() {
154        let provider = TestProvider::new(StorageSettings::v1());
155
156        let result = provider.with_rocksdb_snapshot(|rocksdb| {
157            assert!(rocksdb.is_none(), "legacy settings should pass None");
158            Ok(42)
159        });
160
161        assert_eq!(result.unwrap(), 42);
162        assert_eq!(
163            provider.tx_call_count(),
164            0,
165            "should not create RocksDB provider for legacy settings"
166        );
167    }
168
169    #[test]
170    fn test_rocksdb_settings_create_snapshot() {
171        let settings = StorageSettings::v2();
172        let provider = TestProvider::new(settings);
173
174        let result = provider.with_rocksdb_snapshot(|rocksdb| {
175            assert!(rocksdb.is_some(), "rocksdb settings should pass Some snapshot");
176            Ok(42)
177        });
178
179        assert_eq!(result.unwrap(), 42);
180        assert_eq!(
181            provider.tx_call_count(),
182            1,
183            "should create RocksDB provider when storage_v2 is true"
184        );
185    }
186}