1use crate::common::CliNodeTypes;
4use alloy_primitives::Address;
5use clap::Parser;
6use reth_db::{
7 mdbx::{self, ffi},
8 models::StorageBeforeTx,
9 DatabaseEnv,
10};
11use reth_db_api::{
12 cursor::DbCursorRO,
13 database::Database,
14 table::Table,
15 tables,
16 transaction::{DbTx, DbTxMut},
17};
18use reth_node_builder::NodeTypesWithDBAdapter;
19use reth_provider::{
20 providers::ProviderNodeTypes, BlockNumReader, DBProvider, DatabaseProviderFactory,
21 MetadataProvider, MetadataWriter, ProviderFactory, PruneCheckpointReader,
22 RocksDBProviderFactory, StageCheckpointWriter, StaticFileProviderFactory, StaticFileWriter,
23 StorageSettings,
24};
25use reth_prune_types::PruneSegment;
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::StageCheckpointReader;
29use tracing::{info, warn};
30
31#[derive(Debug, Parser)]
33pub struct Command;
34
35impl Command {
36 pub async fn execute<N: CliNodeTypes>(
43 self,
44 provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
45 ) -> eyre::Result<()>
46 where
47 N::Primitives: reth_primitives_traits::NodePrimitives<
48 Receipt: reth_db_api::table::Value + reth_codecs::Compact,
49 >,
50 {
51 info!(target: "reth::cli", "Starting v1 → v2 storage migration");
53
54 let provider = provider_factory.provider()?;
55 let current_settings = provider.storage_settings()?;
56
57 if current_settings.is_some_and(|s| s.is_v2()) {
58 info!(target: "reth::cli", "Storage is already v2, nothing to do");
59 return Ok(());
60 }
61
62 let tip =
63 provider.get_stage_checkpoint(StageId::Execution)?.map(|c| c.block_number).unwrap_or(0);
64
65 info!(target: "reth::cli", tip, "Chain tip block number");
66
67 let sf_provider = provider_factory.static_file_provider();
68
69 for segment in [StaticFileSegment::AccountChangeSets, StaticFileSegment::StorageChangeSets]
70 {
71 if sf_provider.get_highest_static_file_block(segment).is_some() {
72 eyre::bail!(
73 "Static file segment {segment:?} already contains data. \
74 Cannot migrate — target must be empty."
75 );
76 }
77 }
78
79 drop(provider);
80
81 Self::migrate_account_changesets(&provider_factory, tip)?;
83 Self::migrate_storage_changesets(&provider_factory, tip)?;
84
85 Self::migrate_receipts::<NodeTypesWithDBAdapter<N, DatabaseEnv>>(&provider_factory, tip)?;
87
88 Self::migrate_to_rocksdb::<_, tables::TransactionHashNumbers>(&provider_factory)?;
90 Self::migrate_to_rocksdb::<_, tables::AccountsHistory>(&provider_factory)?;
91 Self::migrate_to_rocksdb::<_, tables::StoragesHistory>(&provider_factory)?;
92
93 info!(target: "reth::cli", "Writing StorageSettings v2 metadata");
95 {
96 let provider_rw = provider_factory.database_provider_rw()?;
97 provider_rw.write_storage_settings(StorageSettings::v2())?;
98 provider_rw.commit()?;
99 }
100 info!(target: "reth::cli", "Storage settings updated to v2");
101
102 Self::clear_migrated_and_recomputable_tables(&provider_factory)?;
104
105 let db_path = provider_factory.db_ref().path();
107 Self::compact_mdbx(provider_factory.db_ref())?;
108
109 drop(provider_factory);
111
112 let compact_path = db_path.with_file_name("db_compact");
113 Self::swap_compacted_db(&db_path, &compact_path)?;
114
115 info!(target: "reth::cli", "Migration complete. You should now restart the node and let it run the pipeline to rebuild the remaining data.");
120 Ok(())
121 }
122
123 fn migrate_account_changesets<N: ProviderNodeTypes>(
124 factory: &ProviderFactory<N>,
125 tip: u64,
126 ) -> eyre::Result<()> {
127 info!(target: "reth::cli", "Migrating AccountChangeSets → static files");
128 let provider = factory.provider()?.disable_long_read_transaction_safety();
129 let sf_provider = factory.static_file_provider();
130
131 let mut cursor = provider.tx_ref().cursor_read::<tables::AccountChangeSets>()?;
132
133 let first_block = provider
134 .get_prune_checkpoint(PruneSegment::AccountHistory)?
135 .and_then(|cp| cp.block_number)
136 .map_or(0, |b| b + 1);
137
138 let mut writer = sf_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
141 if first_block > 0 {
142 writer.ensure_at_block(first_block - 1)?;
143 }
144
145 let mut count = 0u64;
146 let mut walker = cursor.walk(Some(first_block))?.peekable();
147
148 for block in first_block..=tip {
149 let mut entries = Vec::new();
150
151 while let Some(Ok((block_number, _))) = walker.peek() {
152 if *block_number != block {
153 break;
154 }
155 let (_, entry) = walker.next().expect("peeked")?;
156 entries.push(entry);
157 }
158
159 count += entries.len() as u64;
160 writer.append_account_changeset(entries, block)?;
161 }
162
163 writer.commit()?;
164
165 info!(target: "reth::cli", count, "AccountChangeSets migrated");
166 Ok(())
167 }
168
169 fn migrate_storage_changesets<N: ProviderNodeTypes>(
170 factory: &ProviderFactory<N>,
171 tip: u64,
172 ) -> eyre::Result<()> {
173 info!(target: "reth::cli", "Migrating StorageChangeSets → static files");
174 let provider = factory.provider()?.disable_long_read_transaction_safety();
175 let sf_provider = factory.static_file_provider();
176
177 let mut cursor = provider.tx_ref().cursor_read::<tables::StorageChangeSets>()?;
178
179 let first_block = provider
180 .get_prune_checkpoint(PruneSegment::StorageHistory)?
181 .and_then(|cp| cp.block_number)
182 .map_or(0, |b| b + 1);
183
184 let mut writer = sf_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
187 if first_block > 0 {
188 writer.ensure_at_block(first_block - 1)?;
189 }
190
191 let mut count = 0u64;
192 let mut walker = cursor.walk(Some((first_block, Address::ZERO).into()))?.peekable();
193
194 for block in first_block..=tip {
195 let mut entries = Vec::new();
196
197 while let Some(Ok((key, _))) = walker.peek() {
198 if key.block_number() != block {
199 break;
200 }
201 let (key, entry) = walker.next().expect("peeked")?;
202 entries.push(StorageBeforeTx {
203 address: key.address(),
204 key: entry.key,
205 value: entry.value,
206 });
207 }
208
209 count += entries.len() as u64;
210 writer.append_storage_changeset(entries, block)?;
211 }
212
213 writer.commit()?;
214
215 info!(target: "reth::cli", count, "StorageChangeSets migrated");
216 Ok(())
217 }
218
219 fn migrate_receipts<N: ProviderNodeTypes>(
220 factory: &ProviderFactory<N>,
221 tip: u64,
222 ) -> eyre::Result<()>
223 where
224 N::Primitives: reth_primitives_traits::NodePrimitives<
225 Receipt: reth_db_api::table::Value + reth_codecs::Compact,
226 >,
227 {
228 let provider = factory.provider()?;
229 if !provider.prune_modes_ref().receipts_log_filter.is_empty() {
230 info!(target: "reth::cli", "Receipt log filter pruning is enabled, keeping receipts in MDBX");
231 return Ok(());
232 }
233 drop(provider);
234
235 let sf_provider = factory.static_file_provider();
236 let existing = sf_provider.get_highest_static_file_block(StaticFileSegment::Receipts);
237
238 if existing.is_some_and(|b| b >= tip) {
239 info!(target: "reth::cli", "Receipts already in static files, skipping");
240 return Ok(());
241 }
242
243 info!(target: "reth::cli", "Migrating Receipts → static files");
244
245 let provider = factory.provider()?.disable_long_read_transaction_safety();
246 let prune_start = provider
247 .get_prune_checkpoint(PruneSegment::Receipts)?
248 .and_then(|cp| cp.block_number)
249 .map_or(0, |b| b + 1);
250 let first_block = prune_start.max(existing.map_or(0, |b| b + 1));
251
252 if first_block > 0 {
255 let mut writer = sf_provider.latest_writer(StaticFileSegment::Receipts)?;
256 writer.ensure_at_block(first_block - 1)?;
257 writer.commit()?;
258 }
259
260 let before = sf_provider
261 .get_highest_static_file_tx(StaticFileSegment::Receipts)
262 .map_or(0, |tx| tx + 1);
263
264 let block_range = first_block..=tip;
265
266 let segment = reth_static_file::segments::Receipts;
267 reth_static_file::segments::Segment::copy_to_static_files(&segment, provider, block_range)?;
268
269 sf_provider.commit()?;
270
271 let after = sf_provider
272 .get_highest_static_file_tx(StaticFileSegment::Receipts)
273 .map_or(0, |tx| tx + 1);
274 let count = after - before;
275 info!(target: "reth::cli", count, "Receipts migrated");
276 Ok(())
277 }
278
279 fn migrate_to_rocksdb<N: ProviderNodeTypes, T: Table>(
280 factory: &ProviderFactory<N>,
281 ) -> eyre::Result<()> {
282 info!(target: "reth::cli", table = T::NAME, "Migrating MDBX table → RocksDB");
283
284 let provider = factory.provider()?.disable_long_read_transaction_safety();
285 let mut cursor = provider.tx_ref().cursor_read::<T>()?;
286
287 let rocksdb = factory.rocksdb_provider();
288 rocksdb.clear::<T>()?;
289 let mut batch = rocksdb.batch_with_auto_commit();
290
291 let mut count = 0u64;
292 for entry in cursor.walk(None)? {
293 let (key, value) = entry?;
294 batch.put::<T>(key, &value)?;
295 count += 1;
296 }
297
298 batch.commit()?;
299 rocksdb.flush(&[T::NAME])?;
300
301 info!(target: "reth::cli", table = T::NAME, count, "MDBX table migrated to RocksDB");
302 Ok(())
303 }
304
305 fn clear_migrated_and_recomputable_tables<N: ProviderNodeTypes>(
308 factory: &ProviderFactory<N>,
309 ) -> eyre::Result<()> {
310 info!(target: "reth::cli", "Clearing migrated and recomputable MDBX tables");
311 let db = factory.db_ref();
312
313 macro_rules! clear_table {
314 ($table:ty) => {{
315 let tx = db.tx_mut()?;
316 tx.clear::<$table>()?;
317 tx.commit()?;
318 info!(target: "reth::cli", table = <$table as Table>::NAME, "Cleared");
319 }};
320 }
321
322 clear_table!(tables::AccountChangeSets);
324 clear_table!(tables::StorageChangeSets);
325
326 clear_table!(tables::TransactionSenders);
328
329 clear_table!(tables::TransactionHashNumbers);
331 clear_table!(tables::AccountsHistory);
332 clear_table!(tables::StoragesHistory);
333
334 clear_table!(tables::PlainAccountState);
336 clear_table!(tables::PlainStorageState);
337
338 clear_table!(tables::AccountsTrie);
340 clear_table!(tables::StoragesTrie);
341
342 info!(target: "reth::cli", "Resetting stage checkpoints");
344 let provider_rw = factory.database_provider_rw()?;
345 for stage in [StageId::SenderRecovery, StageId::MerkleExecute, StageId::MerkleUnwind] {
346 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(0))?;
347 info!(target: "reth::cli", %stage, "Checkpoint reset to 0");
348 }
349 provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
350
351 if provider_rw.last_block_number()? > 0 {
352 let first_indices_entry = provider_rw
353 .tx_ref()
354 .cursor_read::<tables::BlockBodyIndices>()?
355 .seek(1)?
356 .map(|(block, _)| block)
357 .ok_or_else(|| eyre::eyre!("no block body indices found"))?;
358
359 if first_indices_entry > 1 {
366 provider_rw.save_stage_checkpoint(
367 StageId::SenderRecovery,
368 StageCheckpoint::new(first_indices_entry - 1),
369 )?;
370
371 let static_file_provider = provider_rw.static_file_provider();
373 let mut senders_writer =
374 static_file_provider.latest_writer(StaticFileSegment::TransactionSenders)?;
375 senders_writer.ensure_at_block(first_indices_entry - 1)?;
376 senders_writer.commit()?;
377
378 warn!(
379 target: "reth::cli",
380 "Missing block body indices data for first {first_indices_entry} blocks, initializing sender recovery with the first block that has a corresponding block body indices entry"
381 );
382 }
383 }
384 provider_rw.commit()?;
385
386 info!(target: "reth::cli", "Recomputable tables cleared");
387 Ok(())
388 }
389
390 fn compact_mdbx(db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
392 let db_path = db.path();
393 let compact_path = db_path.with_file_name("db_compact");
394
395 reth_fs_util::create_dir_all(&compact_path)?;
396
397 info!(target: "reth::cli", ?db_path, ?compact_path, "Compacting MDBX database");
398
399 let compact_dest = compact_path.join("mdbx.dat");
400 let dest_cstr = std::ffi::CString::new(
401 compact_dest.to_str().ok_or_else(|| eyre::eyre!("compact path must be valid UTF-8"))?,
402 )?;
403
404 let flags = ffi::MDBX_CP_COMPACT | ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
405
406 let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
407 ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
408 });
409
410 if rc != 0 {
411 eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
412 std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
413 });
414 }
415
416 info!(target: "reth::cli", "MDBX compaction complete");
417 Ok(())
418 }
419
420 fn swap_compacted_db(
422 db_path: &std::path::Path,
423 compact_path: &std::path::Path,
424 ) -> eyre::Result<()> {
425 let backup_path = db_path.with_file_name("db_pre_compact");
426
427 info!(target: "reth::cli", ?db_path, ?compact_path, "Swapping compacted database");
428
429 std::fs::rename(db_path, &backup_path)?;
430
431 if let Err(e) = std::fs::rename(compact_path, db_path) {
432 let _ = std::fs::rename(&backup_path, db_path);
433 return Err(e.into());
434 }
435
436 std::fs::remove_dir_all(&backup_path)?;
437
438 info!(target: "reth::cli", "Database compaction swap complete");
439 Ok(())
440 }
441}