1use crate::common::CliNodeTypes;
8use alloy_primitives::Address;
9use clap::Parser;
10use reth_db::{
11 mdbx::{self, ffi},
12 models::StorageBeforeTx,
13 DatabaseEnv,
14};
15use reth_db_api::{
16 cursor::DbCursorRO,
17 database::Database,
18 table::Table,
19 tables,
20 transaction::{DbTx, DbTxMut},
21};
22use reth_node_builder::NodeTypesWithDBAdapter;
23use reth_provider::{
24 providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, MetadataProvider,
25 MetadataWriter, ProviderFactory, PruneCheckpointReader, StageCheckpointWriter,
26 StaticFileProviderFactory, StaticFileWriter, StorageSettings,
27};
28use reth_prune_types::PruneSegment;
29use reth_stages_types::{StageCheckpoint, StageId};
30use reth_static_file_types::StaticFileSegment;
31use reth_storage_api::StageCheckpointReader;
32use tracing::info;
33
34#[derive(Debug, Parser)]
36pub struct Command;
37
38impl Command {
39 pub async fn execute<N: CliNodeTypes>(
46 self,
47 provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
48 ) -> eyre::Result<()>
49 where
50 N::Primitives: reth_primitives_traits::NodePrimitives<
51 Receipt: reth_db_api::table::Value + reth_codecs::Compact,
52 >,
53 {
54 info!(target: "reth::cli", "Starting v1 → v2 storage migration");
56
57 let provider = provider_factory.provider()?;
58 let current_settings = provider.storage_settings()?;
59
60 if current_settings.is_some_and(|s| s.is_v2()) {
61 info!(target: "reth::cli", "Storage is already v2, nothing to do");
62 return Ok(());
63 }
64
65 let tip =
66 provider.get_stage_checkpoint(StageId::Execution)?.map(|c| c.block_number).unwrap_or(0);
67
68 info!(target: "reth::cli", tip, "Chain tip block number");
69
70 let sf_provider = provider_factory.static_file_provider();
71
72 for segment in [StaticFileSegment::AccountChangeSets, StaticFileSegment::StorageChangeSets]
73 {
74 if sf_provider.get_highest_static_file_block(segment).is_some() {
75 eyre::bail!(
76 "Static file segment {segment:?} already contains data. \
77 Cannot migrate — target must be empty."
78 );
79 }
80 }
81
82 drop(provider);
83
84 Self::migrate_account_changesets(&provider_factory, tip)?;
86 Self::migrate_storage_changesets(&provider_factory, tip)?;
87
88 Self::migrate_receipts::<NodeTypesWithDBAdapter<N, DatabaseEnv>>(&provider_factory, tip)?;
90
91 info!(target: "reth::cli", "Writing StorageSettings v2 metadata");
93 {
94 let provider_rw = provider_factory.database_provider_rw()?;
95 provider_rw.write_storage_settings(StorageSettings::v2())?;
96 provider_rw.commit()?;
97 }
98 info!(target: "reth::cli", "Storage settings updated to v2");
99
100 Self::clear_recomputable_tables(&provider_factory)?;
102
103 let db_path = provider_factory.db_ref().path();
105 Self::compact_mdbx(provider_factory.db_ref())?;
106
107 drop(provider_factory);
109
110 let compact_path = db_path.with_file_name("db_compact");
111 Self::swap_compacted_db(&db_path, &compact_path)?;
112
113 info!(target: "reth::cli", "Migration complete. You should now restart the node and let it run the pipeline to rebuild the remaining data.");
118 Ok(())
119 }
120
121 fn migrate_account_changesets<N: ProviderNodeTypes>(
122 factory: &ProviderFactory<N>,
123 tip: u64,
124 ) -> eyre::Result<()> {
125 info!(target: "reth::cli", "Migrating AccountChangeSets → static files");
126 let provider = factory.provider()?.disable_long_read_transaction_safety();
127 let sf_provider = factory.static_file_provider();
128
129 let mut cursor = provider.tx_ref().cursor_read::<tables::AccountChangeSets>()?;
130
131 let first_block = provider
132 .get_prune_checkpoint(PruneSegment::AccountHistory)?
133 .and_then(|cp| cp.block_number)
134 .map_or(0, |b| b + 1);
135
136 let mut writer = sf_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
139 if first_block > 0 {
140 writer.ensure_at_block(first_block - 1)?;
141 }
142
143 let mut count = 0u64;
144 let mut walker = cursor.walk(Some(first_block))?.peekable();
145
146 for block in first_block..=tip {
147 let mut entries = Vec::new();
148
149 while let Some(Ok((block_number, _))) = walker.peek() {
150 if *block_number != block {
151 break;
152 }
153 let (_, entry) = walker.next().expect("peeked")?;
154 entries.push(entry);
155 }
156
157 count += entries.len() as u64;
158 writer.append_account_changeset(entries, block)?;
159 }
160
161 writer.commit()?;
162
163 info!(target: "reth::cli", count, "AccountChangeSets migrated");
164 Ok(())
165 }
166
167 fn migrate_storage_changesets<N: ProviderNodeTypes>(
168 factory: &ProviderFactory<N>,
169 tip: u64,
170 ) -> eyre::Result<()> {
171 info!(target: "reth::cli", "Migrating StorageChangeSets → static files");
172 let provider = factory.provider()?.disable_long_read_transaction_safety();
173 let sf_provider = factory.static_file_provider();
174
175 let mut cursor = provider.tx_ref().cursor_read::<tables::StorageChangeSets>()?;
176
177 let first_block = provider
178 .get_prune_checkpoint(PruneSegment::StorageHistory)?
179 .and_then(|cp| cp.block_number)
180 .map_or(0, |b| b + 1);
181
182 let mut writer = sf_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
185 if first_block > 0 {
186 writer.ensure_at_block(first_block - 1)?;
187 }
188
189 let mut count = 0u64;
190 let mut walker = cursor.walk(Some((first_block, Address::ZERO).into()))?.peekable();
191
192 for block in first_block..=tip {
193 let mut entries = Vec::new();
194
195 while let Some(Ok((key, _))) = walker.peek() {
196 if key.block_number() != block {
197 break;
198 }
199 let (key, entry) = walker.next().expect("peeked")?;
200 entries.push(StorageBeforeTx {
201 address: key.address(),
202 key: entry.key,
203 value: entry.value,
204 });
205 }
206
207 count += entries.len() as u64;
208 writer.append_storage_changeset(entries, block)?;
209 }
210
211 writer.commit()?;
212
213 info!(target: "reth::cli", count, "StorageChangeSets migrated");
214 Ok(())
215 }
216
217 fn migrate_receipts<N: ProviderNodeTypes>(
218 factory: &ProviderFactory<N>,
219 tip: u64,
220 ) -> eyre::Result<()>
221 where
222 N::Primitives: reth_primitives_traits::NodePrimitives<
223 Receipt: reth_db_api::table::Value + reth_codecs::Compact,
224 >,
225 {
226 let provider = factory.provider()?;
227 if !provider.prune_modes_ref().receipts_log_filter.is_empty() {
228 info!(target: "reth::cli", "Receipt log filter pruning is enabled, keeping receipts in MDBX");
229 return Ok(());
230 }
231 drop(provider);
232
233 let sf_provider = factory.static_file_provider();
234 let existing = sf_provider.get_highest_static_file_block(StaticFileSegment::Receipts);
235
236 if existing.is_some_and(|b| b >= tip) {
237 info!(target: "reth::cli", "Receipts already in static files, skipping");
238 return Ok(());
239 }
240
241 info!(target: "reth::cli", "Migrating Receipts → static files");
242
243 let provider = factory.provider()?.disable_long_read_transaction_safety();
244 let prune_start = provider
245 .get_prune_checkpoint(PruneSegment::Receipts)?
246 .and_then(|cp| cp.block_number)
247 .map_or(0, |b| b + 1);
248 let first_block = prune_start.max(existing.map_or(0, |b| b + 1));
249
250 if first_block > 0 {
253 let mut writer = sf_provider.latest_writer(StaticFileSegment::Receipts)?;
254 writer.ensure_at_block(first_block - 1)?;
255 writer.commit()?;
256 }
257
258 let before = sf_provider
259 .get_highest_static_file_tx(StaticFileSegment::Receipts)
260 .map_or(0, |tx| tx + 1);
261
262 let block_range = first_block..=tip;
263
264 let segment = reth_static_file::segments::Receipts;
265 reth_static_file::segments::Segment::copy_to_static_files(&segment, provider, block_range)?;
266
267 sf_provider.commit()?;
268
269 let after = sf_provider
270 .get_highest_static_file_tx(StaticFileSegment::Receipts)
271 .map_or(0, |tx| tx + 1);
272 let count = after - before;
273 info!(target: "reth::cli", count, "Receipts migrated");
274 Ok(())
275 }
276
277 fn clear_recomputable_tables<N: ProviderNodeTypes>(
280 factory: &ProviderFactory<N>,
281 ) -> eyre::Result<()> {
282 info!(target: "reth::cli", "Clearing recomputable MDBX tables");
283 let db = factory.db_ref();
284
285 macro_rules! clear_table {
286 ($table:ty) => {{
287 let tx = db.tx_mut()?;
288 tx.clear::<$table>()?;
289 tx.commit()?;
290 info!(target: "reth::cli", table = <$table as Table>::NAME, "Cleared");
291 }};
292 }
293
294 clear_table!(tables::AccountChangeSets);
296 clear_table!(tables::StorageChangeSets);
297
298 clear_table!(tables::TransactionSenders);
300
301 clear_table!(tables::TransactionHashNumbers);
303 clear_table!(tables::AccountsHistory);
304 clear_table!(tables::StoragesHistory);
305
306 clear_table!(tables::PlainAccountState);
308 clear_table!(tables::PlainStorageState);
309
310 clear_table!(tables::AccountsTrie);
312 clear_table!(tables::StoragesTrie);
313
314 info!(target: "reth::cli", "Resetting stage checkpoints");
316 let provider_rw = factory.database_provider_rw()?;
317 for stage in [
318 StageId::SenderRecovery,
319 StageId::TransactionLookup,
320 StageId::IndexAccountHistory,
321 StageId::IndexStorageHistory,
322 StageId::MerkleExecute,
323 StageId::MerkleUnwind,
324 ] {
325 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(0))?;
326 info!(target: "reth::cli", %stage, "Checkpoint reset to 0");
327 }
328 provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
329 provider_rw.commit()?;
330
331 info!(target: "reth::cli", "Recomputable tables cleared");
332 Ok(())
333 }
334
335 fn compact_mdbx(db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
337 let db_path = db.path();
338 let compact_path = db_path.with_file_name("db_compact");
339
340 reth_fs_util::create_dir_all(&compact_path)?;
341
342 info!(target: "reth::cli", ?db_path, ?compact_path, "Compacting MDBX database");
343
344 let compact_dest = compact_path.join("mdbx.dat");
345 let dest_cstr = std::ffi::CString::new(
346 compact_dest.to_str().ok_or_else(|| eyre::eyre!("compact path must be valid UTF-8"))?,
347 )?;
348
349 let flags = ffi::MDBX_CP_COMPACT | ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
350
351 let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
352 ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
353 });
354
355 if rc != 0 {
356 eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
357 std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
358 });
359 }
360
361 info!(target: "reth::cli", "MDBX compaction complete");
362 Ok(())
363 }
364
365 fn swap_compacted_db(
367 db_path: &std::path::Path,
368 compact_path: &std::path::Path,
369 ) -> eyre::Result<()> {
370 let backup_path = db_path.with_file_name("db_pre_compact");
371
372 info!(target: "reth::cli", ?db_path, ?compact_path, "Swapping compacted database");
373
374 std::fs::rename(db_path, &backup_path)?;
375
376 if let Err(e) = std::fs::rename(compact_path, db_path) {
377 let _ = std::fs::rename(&backup_path, db_path);
378 return Err(e.into());
379 }
380
381 std::fs::remove_dir_all(&backup_path)?;
382
383 info!(target: "reth::cli", "Database compaction swap complete");
384 Ok(())
385 }
386}