Skip to main content

reth_provider/traits/
rocksdb_provider.rs

1use crate::{
2    either_writer::{RawRocksDBBatch, RocksBatchArg, RocksTxRefArg},
3    providers::RocksDBProvider,
4};
5use reth_storage_api::StorageSettingsCache;
6use reth_storage_errors::provider::ProviderResult;
7
8/// `RocksDB` provider factory.
9///
10/// This trait provides access to the `RocksDB` provider
11pub trait RocksDBProviderFactory {
12    /// Returns the `RocksDB` provider.
13    fn rocksdb_provider(&self) -> RocksDBProvider;
14
15    /// Adds a pending `RocksDB` batch to be committed when this provider is committed.
16    ///
17    /// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
18    /// commits, ensuring atomicity across all storage backends.
19    #[cfg(all(unix, feature = "rocksdb"))]
20    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
21
22    /// Takes all pending `RocksDB` batches and commits them.
23    ///
24    /// This drains the pending batches from the lock and commits each one using the `RocksDB`
25    /// provider. Can be called before flush to persist `RocksDB` writes independently of the
26    /// full commit path.
27    #[cfg(all(unix, feature = "rocksdb"))]
28    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>;
29
30    /// Executes a closure with a `RocksDB` transaction for reading.
31    ///
32    /// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
33    /// On legacy MDBX-only nodes (where `any_in_rocksdb()` is false), this skips creating
34    /// the `RocksDB` transaction entirely, avoiding unnecessary overhead.
35    fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
36    where
37        Self: StorageSettingsCache,
38        F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
39    {
40        #[cfg(all(unix, feature = "rocksdb"))]
41        {
42            if self.cached_storage_settings().storage_v2 {
43                let rocksdb = self.rocksdb_provider();
44                let tx = rocksdb.tx();
45                return f(Some(&tx));
46            }
47            f(None)
48        }
49        #[cfg(not(all(unix, feature = "rocksdb")))]
50        f(())
51    }
52
53    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
54    ///
55    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
56    fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
57    where
58        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
59    {
60        #[cfg(all(unix, feature = "rocksdb"))]
61        {
62            let rocksdb = self.rocksdb_provider();
63            let batch = rocksdb.batch();
64            let (result, raw_batch) = f(batch)?;
65            if let Some(b) = raw_batch {
66                self.set_pending_rocksdb_batch(b);
67            }
68            Ok(result)
69        }
70        #[cfg(not(all(unix, feature = "rocksdb")))]
71        {
72            let (result, _) = f(())?;
73            Ok(result)
74        }
75    }
76
77    /// Executes a closure with a `RocksDB` batch that auto-commits on threshold.
78    ///
79    /// Unlike [`Self::with_rocksdb_batch`], this uses a batch that automatically commits
80    /// when it exceeds the size threshold, preventing OOM during large bulk writes.
81    /// The consistency check on startup heals any crash between auto-commits.
82    fn with_rocksdb_batch_auto_commit<F, R>(&self, f: F) -> ProviderResult<R>
83    where
84        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
85    {
86        #[cfg(all(unix, feature = "rocksdb"))]
87        {
88            let rocksdb = self.rocksdb_provider();
89            let batch = rocksdb.batch_with_auto_commit();
90            let (result, raw_batch) = f(batch)?;
91            if let Some(b) = raw_batch {
92                self.set_pending_rocksdb_batch(b);
93            }
94            Ok(result)
95        }
96        #[cfg(not(all(unix, feature = "rocksdb")))]
97        {
98            let (result, _) = f(())?;
99            Ok(result)
100        }
101    }
102}
103
104#[cfg(all(test, unix, feature = "rocksdb"))]
105mod tests {
106    use super::*;
107    use reth_db_api::models::StorageSettings;
108    use std::sync::atomic::{AtomicUsize, Ordering};
109
110    /// Mock `RocksDB` provider that tracks `tx()` calls.
111    struct MockRocksDBProvider {
112        tx_call_count: AtomicUsize,
113    }
114
115    impl MockRocksDBProvider {
116        const fn new() -> Self {
117            Self { tx_call_count: AtomicUsize::new(0) }
118        }
119
120        fn tx_call_count(&self) -> usize {
121            self.tx_call_count.load(Ordering::SeqCst)
122        }
123
124        fn increment_tx_count(&self) {
125            self.tx_call_count.fetch_add(1, Ordering::SeqCst);
126        }
127    }
128
129    /// Test provider that implements [`RocksDBProviderFactory`] + [`StorageSettingsCache`].
130    struct TestProvider {
131        settings: StorageSettings,
132        mock_rocksdb: MockRocksDBProvider,
133        temp_dir: tempfile::TempDir,
134    }
135
136    impl TestProvider {
137        fn new(settings: StorageSettings) -> Self {
138            Self {
139                settings,
140                mock_rocksdb: MockRocksDBProvider::new(),
141                temp_dir: tempfile::TempDir::new().unwrap(),
142            }
143        }
144
145        fn tx_call_count(&self) -> usize {
146            self.mock_rocksdb.tx_call_count()
147        }
148    }
149
150    impl StorageSettingsCache for TestProvider {
151        fn cached_storage_settings(&self) -> StorageSettings {
152            self.settings
153        }
154
155        fn set_storage_settings_cache(&self, _settings: StorageSettings) {}
156    }
157
158    impl RocksDBProviderFactory for TestProvider {
159        fn rocksdb_provider(&self) -> RocksDBProvider {
160            self.mock_rocksdb.increment_tx_count();
161            RocksDBProvider::new(self.temp_dir.path()).unwrap()
162        }
163
164        fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
165
166        fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
167            Ok(())
168        }
169    }
170
171    #[test]
172    fn test_legacy_settings_skip_rocksdb_tx_creation() {
173        let provider = TestProvider::new(StorageSettings::v1());
174
175        let result = provider.with_rocksdb_tx(|tx| {
176            assert!(tx.is_none(), "legacy settings should pass None tx");
177            Ok(42)
178        });
179
180        assert_eq!(result.unwrap(), 42);
181        assert_eq!(provider.tx_call_count(), 0, "should not create RocksDB tx for legacy settings");
182    }
183
184    #[test]
185    fn test_rocksdb_settings_create_tx() {
186        let settings = StorageSettings::v2();
187        let provider = TestProvider::new(settings);
188
189        let result = provider.with_rocksdb_tx(|tx| {
190            assert!(tx.is_some(), "rocksdb settings should pass Some tx");
191            Ok(42)
192        });
193
194        assert_eq!(result.unwrap(), 42);
195        assert_eq!(
196            provider.tx_call_count(),
197            1,
198            "should create RocksDB tx when any_in_rocksdb is true"
199        );
200    }
201}