Skip to main content

reth_cli_commands/db/
migrate_v2.rs

1//! `reth db migrate-v2` command for migrating v1 storage layout to v2.
2//!
3//! Migrates data that cannot be recomputed (changesets + receipts) from MDBX to
4//! static files, clears recomputable tables (senders, indices, trie, plain
5//! state), compacts MDBX, then runs the pipeline to rebuild them.
6
7use crate::common::CliNodeTypes;
8use alloy_primitives::Address;
9use clap::Parser;
10use reth_db::{
11    mdbx::{self, ffi},
12    models::StorageBeforeTx,
13    DatabaseEnv,
14};
15use reth_db_api::{
16    cursor::DbCursorRO,
17    database::Database,
18    table::Table,
19    tables,
20    transaction::{DbTx, DbTxMut},
21};
22use reth_node_builder::NodeTypesWithDBAdapter;
23use reth_provider::{
24    providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, MetadataProvider,
25    MetadataWriter, ProviderFactory, PruneCheckpointReader, StageCheckpointWriter,
26    StaticFileProviderFactory, StaticFileWriter, StorageSettings,
27};
28use reth_prune_types::PruneSegment;
29use reth_stages_types::{StageCheckpoint, StageId};
30use reth_static_file_types::StaticFileSegment;
31use reth_storage_api::StageCheckpointReader;
32use tracing::info;
33
34/// `reth db migrate-v2` command
35#[derive(Debug, Parser)]
36pub struct Command;
37
38impl Command {
39    /// Execute the full v1 → v2 migration:
40    ///
41    /// 1. Migrate changesets + receipts to static files
42    /// 2. Flip `StorageSettings` to v2
43    /// 3. Clear recomputable MDBX tables + reset stage checkpoints
44    /// 4. Compact MDBX
45    pub async fn execute<N: CliNodeTypes>(
46        self,
47        provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
48    ) -> eyre::Result<()>
49    where
50        N::Primitives: reth_primitives_traits::NodePrimitives<
51            Receipt: reth_db_api::table::Value + reth_codecs::Compact,
52        >,
53    {
54        // === Phase 0: Preflight ===
55        info!(target: "reth::cli", "Starting v1 → v2 storage migration");
56
57        let provider = provider_factory.provider()?;
58        let current_settings = provider.storage_settings()?;
59
60        if current_settings.is_some_and(|s| s.is_v2()) {
61            info!(target: "reth::cli", "Storage is already v2, nothing to do");
62            return Ok(());
63        }
64
65        let tip =
66            provider.get_stage_checkpoint(StageId::Execution)?.map(|c| c.block_number).unwrap_or(0);
67
68        info!(target: "reth::cli", tip, "Chain tip block number");
69
70        let sf_provider = provider_factory.static_file_provider();
71
72        for segment in [StaticFileSegment::AccountChangeSets, StaticFileSegment::StorageChangeSets]
73        {
74            if sf_provider.get_highest_static_file_block(segment).is_some() {
75                eyre::bail!(
76                    "Static file segment {segment:?} already contains data. \
77                     Cannot migrate — target must be empty."
78                );
79            }
80        }
81
82        drop(provider);
83
84        // === Phase 1: Migrate changesets → static files ===
85        Self::migrate_account_changesets(&provider_factory, tip)?;
86        Self::migrate_storage_changesets(&provider_factory, tip)?;
87
88        // === Phase 2: Migrate receipts → static files ===
89        Self::migrate_receipts::<NodeTypesWithDBAdapter<N, DatabaseEnv>>(&provider_factory, tip)?;
90
91        // === Phase 3: Flip metadata to v2 ===
92        info!(target: "reth::cli", "Writing StorageSettings v2 metadata");
93        {
94            let provider_rw = provider_factory.database_provider_rw()?;
95            provider_rw.write_storage_settings(StorageSettings::v2())?;
96            provider_rw.commit()?;
97        }
98        info!(target: "reth::cli", "Storage settings updated to v2");
99
100        // === Phase 4: Clear recomputable tables ===
101        Self::clear_recomputable_tables(&provider_factory)?;
102
103        // === Phase 5: Compact MDBX (before pipeline, so it runs on a smaller DB) ===
104        let db_path = provider_factory.db_ref().path();
105        Self::compact_mdbx(provider_factory.db_ref())?;
106
107        // Drop to release DB handle for swap
108        drop(provider_factory);
109
110        let compact_path = db_path.with_file_name("db_compact");
111        Self::swap_compacted_db(&db_path, &compact_path)?;
112
113        // === Phase 6: Reopen DB and run pipeline ===
114        // The caller will reopen the environment and run the pipeline.
115        // We return here — the pipeline step is handled in mod.rs after
116        // reopening the database with the compacted copy.
117        info!(target: "reth::cli", "Migration complete. You should now restart the node and let it run the pipeline to rebuild the remaining data.");
118        Ok(())
119    }
120
121    fn migrate_account_changesets<N: ProviderNodeTypes>(
122        factory: &ProviderFactory<N>,
123        tip: u64,
124    ) -> eyre::Result<()> {
125        info!(target: "reth::cli", "Migrating AccountChangeSets → static files");
126        let provider = factory.provider()?.disable_long_read_transaction_safety();
127        let sf_provider = factory.static_file_provider();
128
129        let mut cursor = provider.tx_ref().cursor_read::<tables::AccountChangeSets>()?;
130
131        let first_block = provider
132            .get_prune_checkpoint(PruneSegment::AccountHistory)?
133            .and_then(|cp| cp.block_number)
134            .map_or(0, |b| b + 1);
135
136        // The writer always starts at the fixed range boundary (e.g. 2500000) which may be
137        // earlier than first_block (e.g. 2603897 from prune checkpoint).
138        let mut writer = sf_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
139        if first_block > 0 {
140            writer.ensure_at_block(first_block - 1)?;
141        }
142
143        let mut count = 0u64;
144        let mut walker = cursor.walk(Some(first_block))?.peekable();
145
146        for block in first_block..=tip {
147            let mut entries = Vec::new();
148
149            while let Some(Ok((block_number, _))) = walker.peek() {
150                if *block_number != block {
151                    break;
152                }
153                let (_, entry) = walker.next().expect("peeked")?;
154                entries.push(entry);
155            }
156
157            count += entries.len() as u64;
158            writer.append_account_changeset(entries, block)?;
159        }
160
161        writer.commit()?;
162
163        info!(target: "reth::cli", count, "AccountChangeSets migrated");
164        Ok(())
165    }
166
167    fn migrate_storage_changesets<N: ProviderNodeTypes>(
168        factory: &ProviderFactory<N>,
169        tip: u64,
170    ) -> eyre::Result<()> {
171        info!(target: "reth::cli", "Migrating StorageChangeSets → static files");
172        let provider = factory.provider()?.disable_long_read_transaction_safety();
173        let sf_provider = factory.static_file_provider();
174
175        let mut cursor = provider.tx_ref().cursor_read::<tables::StorageChangeSets>()?;
176
177        let first_block = provider
178            .get_prune_checkpoint(PruneSegment::StorageHistory)?
179            .and_then(|cp| cp.block_number)
180            .map_or(0, |b| b + 1);
181
182        // The writer always starts at the fixed range boundary (e.g. 2500000) which may be
183        // earlier than first_block (e.g. 2603897 from prune checkpoint).
184        let mut writer = sf_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
185        if first_block > 0 {
186            writer.ensure_at_block(first_block - 1)?;
187        }
188
189        let mut count = 0u64;
190        let mut walker = cursor.walk(Some((first_block, Address::ZERO).into()))?.peekable();
191
192        for block in first_block..=tip {
193            let mut entries = Vec::new();
194
195            while let Some(Ok((key, _))) = walker.peek() {
196                if key.block_number() != block {
197                    break;
198                }
199                let (key, entry) = walker.next().expect("peeked")?;
200                entries.push(StorageBeforeTx {
201                    address: key.address(),
202                    key: entry.key,
203                    value: entry.value,
204                });
205            }
206
207            count += entries.len() as u64;
208            writer.append_storage_changeset(entries, block)?;
209        }
210
211        writer.commit()?;
212
213        info!(target: "reth::cli", count, "StorageChangeSets migrated");
214        Ok(())
215    }
216
217    fn migrate_receipts<N: ProviderNodeTypes>(
218        factory: &ProviderFactory<N>,
219        tip: u64,
220    ) -> eyre::Result<()>
221    where
222        N::Primitives: reth_primitives_traits::NodePrimitives<
223            Receipt: reth_db_api::table::Value + reth_codecs::Compact,
224        >,
225    {
226        let provider = factory.provider()?;
227        if !provider.prune_modes_ref().receipts_log_filter.is_empty() {
228            info!(target: "reth::cli", "Receipt log filter pruning is enabled, keeping receipts in MDBX");
229            return Ok(());
230        }
231        drop(provider);
232
233        let sf_provider = factory.static_file_provider();
234        let existing = sf_provider.get_highest_static_file_block(StaticFileSegment::Receipts);
235
236        if existing.is_some_and(|b| b >= tip) {
237            info!(target: "reth::cli", "Receipts already in static files, skipping");
238            return Ok(());
239        }
240
241        info!(target: "reth::cli", "Migrating Receipts → static files");
242
243        let provider = factory.provider()?.disable_long_read_transaction_safety();
244        let prune_start = provider
245            .get_prune_checkpoint(PruneSegment::Receipts)?
246            .and_then(|cp| cp.block_number)
247            .map_or(0, |b| b + 1);
248        let first_block = prune_start.max(existing.map_or(0, |b| b + 1));
249
250        // The writer always starts at the fixed range boundary (e.g. 2500000) which may be
251        // earlier than first_block (e.g. 2603897 from prune checkpoint).
252        if first_block > 0 {
253            let mut writer = sf_provider.latest_writer(StaticFileSegment::Receipts)?;
254            writer.ensure_at_block(first_block - 1)?;
255            writer.commit()?;
256        }
257
258        let before = sf_provider
259            .get_highest_static_file_tx(StaticFileSegment::Receipts)
260            .map_or(0, |tx| tx + 1);
261
262        let block_range = first_block..=tip;
263
264        let segment = reth_static_file::segments::Receipts;
265        reth_static_file::segments::Segment::copy_to_static_files(&segment, provider, block_range)?;
266
267        sf_provider.commit()?;
268
269        let after = sf_provider
270            .get_highest_static_file_tx(StaticFileSegment::Receipts)
271            .map_or(0, |tx| tx + 1);
272        let count = after - before;
273        info!(target: "reth::cli", count, "Receipts migrated");
274        Ok(())
275    }
276
277    /// Clears tables that can be recomputed by the pipeline and resets their
278    /// stage checkpoints.
279    fn clear_recomputable_tables<N: ProviderNodeTypes>(
280        factory: &ProviderFactory<N>,
281    ) -> eyre::Result<()> {
282        info!(target: "reth::cli", "Clearing recomputable MDBX tables");
283        let db = factory.db_ref();
284
285        macro_rules! clear_table {
286            ($table:ty) => {{
287                let tx = db.tx_mut()?;
288                tx.clear::<$table>()?;
289                tx.commit()?;
290                info!(target: "reth::cli", table = <$table as Table>::NAME, "Cleared");
291            }};
292        }
293
294        // Migrated changeset tables (now in static files)
295        clear_table!(tables::AccountChangeSets);
296        clear_table!(tables::StorageChangeSets);
297
298        // Senders — rebuilt by SenderRecovery
299        clear_table!(tables::TransactionSenders);
300
301        // Indices — rebuilt by TransactionLookup / IndexAccountHistory / IndexStorageHistory
302        clear_table!(tables::TransactionHashNumbers);
303        clear_table!(tables::AccountsHistory);
304        clear_table!(tables::StoragesHistory);
305
306        // Plain state — superseded by hashed state in v2
307        clear_table!(tables::PlainAccountState);
308        clear_table!(tables::PlainStorageState);
309
310        // Trie — rebuilt by MerkleExecute
311        clear_table!(tables::AccountsTrie);
312        clear_table!(tables::StoragesTrie);
313
314        // Reset stage checkpoints so the pipeline rebuilds everything
315        info!(target: "reth::cli", "Resetting stage checkpoints");
316        let provider_rw = factory.database_provider_rw()?;
317        for stage in [
318            StageId::SenderRecovery,
319            StageId::TransactionLookup,
320            StageId::IndexAccountHistory,
321            StageId::IndexStorageHistory,
322            StageId::MerkleExecute,
323            StageId::MerkleUnwind,
324        ] {
325            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(0))?;
326            info!(target: "reth::cli", %stage, "Checkpoint reset to 0");
327        }
328        provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
329        provider_rw.commit()?;
330
331        info!(target: "reth::cli", "Recomputable tables cleared");
332        Ok(())
333    }
334
335    /// Creates a compacted copy of the MDBX database.
336    fn compact_mdbx(db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
337        let db_path = db.path();
338        let compact_path = db_path.with_file_name("db_compact");
339
340        reth_fs_util::create_dir_all(&compact_path)?;
341
342        info!(target: "reth::cli", ?db_path, ?compact_path, "Compacting MDBX database");
343
344        let compact_dest = compact_path.join("mdbx.dat");
345        let dest_cstr = std::ffi::CString::new(
346            compact_dest.to_str().ok_or_else(|| eyre::eyre!("compact path must be valid UTF-8"))?,
347        )?;
348
349        let flags = ffi::MDBX_CP_COMPACT | ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
350
351        let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
352            ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
353        });
354
355        if rc != 0 {
356            eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
357                std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
358            });
359        }
360
361        info!(target: "reth::cli", "MDBX compaction complete");
362        Ok(())
363    }
364
365    /// Swaps the original MDBX database with a compacted copy.
366    fn swap_compacted_db(
367        db_path: &std::path::Path,
368        compact_path: &std::path::Path,
369    ) -> eyre::Result<()> {
370        let backup_path = db_path.with_file_name("db_pre_compact");
371
372        info!(target: "reth::cli", ?db_path, ?compact_path, "Swapping compacted database");
373
374        std::fs::rename(db_path, &backup_path)?;
375
376        if let Err(e) = std::fs::rename(compact_path, db_path) {
377            let _ = std::fs::rename(&backup_path, db_path);
378            return Err(e.into());
379        }
380
381        std::fs::remove_dir_all(&backup_path)?;
382
383        info!(target: "reth::cli", "Database compaction swap complete");
384        Ok(())
385    }
386}