reth_provider/traits/
rocksdb_provider.rs1use crate::{
2 either_writer::{RawRocksDBBatch, RocksBatchArg, RocksTxRefArg},
3 providers::RocksDBProvider,
4};
5use reth_storage_api::StorageSettingsCache;
6use reth_storage_errors::provider::ProviderResult;
7
8pub trait RocksDBProviderFactory {
12 fn rocksdb_provider(&self) -> RocksDBProvider;
14
15 #[cfg(all(unix, feature = "rocksdb"))]
20 fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
21
22 #[cfg(all(unix, feature = "rocksdb"))]
28 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>;
29
30 fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
36 where
37 Self: StorageSettingsCache,
38 F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
39 {
40 #[cfg(all(unix, feature = "rocksdb"))]
41 {
42 if self.cached_storage_settings().storage_v2 {
43 let rocksdb = self.rocksdb_provider();
44 let tx = rocksdb.tx();
45 return f(Some(&tx));
46 }
47 f(None)
48 }
49 #[cfg(not(all(unix, feature = "rocksdb")))]
50 f(())
51 }
52
53 fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
57 where
58 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
59 {
60 #[cfg(all(unix, feature = "rocksdb"))]
61 {
62 let rocksdb = self.rocksdb_provider();
63 let batch = rocksdb.batch();
64 let (result, raw_batch) = f(batch)?;
65 if let Some(b) = raw_batch {
66 self.set_pending_rocksdb_batch(b);
67 }
68 Ok(result)
69 }
70 #[cfg(not(all(unix, feature = "rocksdb")))]
71 {
72 let (result, _) = f(())?;
73 Ok(result)
74 }
75 }
76
77 fn with_rocksdb_batch_auto_commit<F, R>(&self, f: F) -> ProviderResult<R>
83 where
84 F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
85 {
86 #[cfg(all(unix, feature = "rocksdb"))]
87 {
88 let rocksdb = self.rocksdb_provider();
89 let batch = rocksdb.batch_with_auto_commit();
90 let (result, raw_batch) = f(batch)?;
91 if let Some(b) = raw_batch {
92 self.set_pending_rocksdb_batch(b);
93 }
94 Ok(result)
95 }
96 #[cfg(not(all(unix, feature = "rocksdb")))]
97 {
98 let (result, _) = f(())?;
99 Ok(result)
100 }
101 }
102}
103
104#[cfg(all(test, unix, feature = "rocksdb"))]
105mod tests {
106 use super::*;
107 use reth_db_api::models::StorageSettings;
108 use std::sync::atomic::{AtomicUsize, Ordering};
109
110 struct MockRocksDBProvider {
112 tx_call_count: AtomicUsize,
113 }
114
115 impl MockRocksDBProvider {
116 const fn new() -> Self {
117 Self { tx_call_count: AtomicUsize::new(0) }
118 }
119
120 fn tx_call_count(&self) -> usize {
121 self.tx_call_count.load(Ordering::SeqCst)
122 }
123
124 fn increment_tx_count(&self) {
125 self.tx_call_count.fetch_add(1, Ordering::SeqCst);
126 }
127 }
128
129 struct TestProvider {
131 settings: StorageSettings,
132 mock_rocksdb: MockRocksDBProvider,
133 temp_dir: tempfile::TempDir,
134 }
135
136 impl TestProvider {
137 fn new(settings: StorageSettings) -> Self {
138 Self {
139 settings,
140 mock_rocksdb: MockRocksDBProvider::new(),
141 temp_dir: tempfile::TempDir::new().unwrap(),
142 }
143 }
144
145 fn tx_call_count(&self) -> usize {
146 self.mock_rocksdb.tx_call_count()
147 }
148 }
149
150 impl StorageSettingsCache for TestProvider {
151 fn cached_storage_settings(&self) -> StorageSettings {
152 self.settings
153 }
154
155 fn set_storage_settings_cache(&self, _settings: StorageSettings) {}
156 }
157
158 impl RocksDBProviderFactory for TestProvider {
159 fn rocksdb_provider(&self) -> RocksDBProvider {
160 self.mock_rocksdb.increment_tx_count();
161 RocksDBProvider::new(self.temp_dir.path()).unwrap()
162 }
163
164 fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
165
166 fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
167 Ok(())
168 }
169 }
170
171 #[test]
172 fn test_legacy_settings_skip_rocksdb_tx_creation() {
173 let provider = TestProvider::new(StorageSettings::v1());
174
175 let result = provider.with_rocksdb_tx(|tx| {
176 assert!(tx.is_none(), "legacy settings should pass None tx");
177 Ok(42)
178 });
179
180 assert_eq!(result.unwrap(), 42);
181 assert_eq!(provider.tx_call_count(), 0, "should not create RocksDB tx for legacy settings");
182 }
183
184 #[test]
185 fn test_rocksdb_settings_create_tx() {
186 let settings = StorageSettings::v2();
187 let provider = TestProvider::new(settings);
188
189 let result = provider.with_rocksdb_tx(|tx| {
190 assert!(tx.is_some(), "rocksdb settings should pass Some tx");
191 Ok(42)
192 });
193
194 assert_eq!(result.unwrap(), 42);
195 assert_eq!(
196 provider.tx_call_count(),
197 1,
198 "should create RocksDB tx when any_in_rocksdb is true"
199 );
200 }
201}