Skip to main content

reth_cli_commands/db/
migrate_v2.rs

1//! `reth db migrate-v2` command for migrating v1 storage layout to v2.
2
3use crate::common::CliNodeTypes;
4use alloy_primitives::Address;
5use clap::Parser;
6use reth_db::{
7    mdbx::{self, ffi},
8    models::StorageBeforeTx,
9    DatabaseEnv,
10};
11use reth_db_api::{
12    cursor::DbCursorRO,
13    database::Database,
14    table::Table,
15    tables,
16    transaction::{DbTx, DbTxMut},
17};
18use reth_node_builder::NodeTypesWithDBAdapter;
19use reth_provider::{
20    providers::ProviderNodeTypes, BlockNumReader, DBProvider, DatabaseProviderFactory,
21    MetadataProvider, MetadataWriter, ProviderFactory, PruneCheckpointReader,
22    RocksDBProviderFactory, StageCheckpointWriter, StaticFileProviderFactory, StaticFileWriter,
23    StorageSettings,
24};
25use reth_prune_types::PruneSegment;
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::StageCheckpointReader;
29use tracing::{info, warn};
30
31/// `reth db migrate-v2` command
32#[derive(Debug, Parser)]
33pub struct Command;
34
35impl Command {
36    /// Execute the full v1 → v2 migration:
37    ///
38    /// 1. Migrate changesets + receipts to static files
39    /// 2. Flip `StorageSettings` to v2
40    /// 3. Clear recomputable MDBX tables + reset stage checkpoints
41    /// 4. Compact MDBX
42    pub async fn execute<N: CliNodeTypes>(
43        self,
44        provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
45    ) -> eyre::Result<()>
46    where
47        N::Primitives: reth_primitives_traits::NodePrimitives<
48            Receipt: reth_db_api::table::Value + reth_codecs::Compact,
49        >,
50    {
51        // === Phase 0: Preflight ===
52        info!(target: "reth::cli", "Starting v1 → v2 storage migration");
53
54        let provider = provider_factory.provider()?;
55        let current_settings = provider.storage_settings()?;
56
57        if current_settings.is_some_and(|s| s.is_v2()) {
58            info!(target: "reth::cli", "Storage is already v2, nothing to do");
59            return Ok(());
60        }
61
62        let tip =
63            provider.get_stage_checkpoint(StageId::Execution)?.map(|c| c.block_number).unwrap_or(0);
64
65        info!(target: "reth::cli", tip, "Chain tip block number");
66
67        let sf_provider = provider_factory.static_file_provider();
68
69        for segment in [StaticFileSegment::AccountChangeSets, StaticFileSegment::StorageChangeSets]
70        {
71            if sf_provider.get_highest_static_file_block(segment).is_some() {
72                eyre::bail!(
73                    "Static file segment {segment:?} already contains data. \
74                     Cannot migrate — target must be empty."
75                );
76            }
77        }
78
79        drop(provider);
80
81        // === Phase 1: Migrate changesets → static files ===
82        Self::migrate_account_changesets(&provider_factory, tip)?;
83        Self::migrate_storage_changesets(&provider_factory, tip)?;
84
85        // === Phase 2: Migrate receipts → static files ===
86        Self::migrate_receipts::<NodeTypesWithDBAdapter<N, DatabaseEnv>>(&provider_factory, tip)?;
87
88        // === Phase 3: Migrate indices → RocksDB ===
89        Self::migrate_to_rocksdb::<_, tables::TransactionHashNumbers>(&provider_factory)?;
90        Self::migrate_to_rocksdb::<_, tables::AccountsHistory>(&provider_factory)?;
91        Self::migrate_to_rocksdb::<_, tables::StoragesHistory>(&provider_factory)?;
92
93        // === Phase 4: Flip metadata to v2 ===
94        info!(target: "reth::cli", "Writing StorageSettings v2 metadata");
95        {
96            let provider_rw = provider_factory.database_provider_rw()?;
97            provider_rw.write_storage_settings(StorageSettings::v2())?;
98            provider_rw.commit()?;
99        }
100        info!(target: "reth::cli", "Storage settings updated to v2");
101
102        // === Phase 5: Clear migrated and recomputable MDBX tables ===
103        Self::clear_migrated_and_recomputable_tables(&provider_factory)?;
104
105        // === Phase 6: Compact MDBX (before pipeline, so it runs on a smaller DB) ===
106        let db_path = provider_factory.db_ref().path();
107        Self::compact_mdbx(provider_factory.db_ref())?;
108
109        // Drop to release DB handle for swap
110        drop(provider_factory);
111
112        let compact_path = db_path.with_file_name("db_compact");
113        Self::swap_compacted_db(&db_path, &compact_path)?;
114
115        // === Phase 7: Reopen DB and run pipeline ===
116        // The caller will reopen the environment and run the pipeline.
117        // We return here — the pipeline step is handled in mod.rs after
118        // reopening the database with the compacted copy.
119        info!(target: "reth::cli", "Migration complete. You should now restart the node and let it run the pipeline to rebuild the remaining data.");
120        Ok(())
121    }
122
123    fn migrate_account_changesets<N: ProviderNodeTypes>(
124        factory: &ProviderFactory<N>,
125        tip: u64,
126    ) -> eyre::Result<()> {
127        info!(target: "reth::cli", "Migrating AccountChangeSets → static files");
128        let provider = factory.provider()?.disable_long_read_transaction_safety();
129        let sf_provider = factory.static_file_provider();
130
131        let mut cursor = provider.tx_ref().cursor_read::<tables::AccountChangeSets>()?;
132
133        let first_block = provider
134            .get_prune_checkpoint(PruneSegment::AccountHistory)?
135            .and_then(|cp| cp.block_number)
136            .map_or(0, |b| b + 1);
137
138        // The writer always starts at the fixed range boundary (e.g. 2500000) which may be
139        // earlier than first_block (e.g. 2603897 from prune checkpoint).
140        let mut writer = sf_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
141        if first_block > 0 {
142            writer.ensure_at_block(first_block - 1)?;
143        }
144
145        let mut count = 0u64;
146        let mut walker = cursor.walk(Some(first_block))?.peekable();
147
148        for block in first_block..=tip {
149            let mut entries = Vec::new();
150
151            while let Some(Ok((block_number, _))) = walker.peek() {
152                if *block_number != block {
153                    break;
154                }
155                let (_, entry) = walker.next().expect("peeked")?;
156                entries.push(entry);
157            }
158
159            count += entries.len() as u64;
160            writer.append_account_changeset(entries, block)?;
161        }
162
163        writer.commit()?;
164
165        info!(target: "reth::cli", count, "AccountChangeSets migrated");
166        Ok(())
167    }
168
169    fn migrate_storage_changesets<N: ProviderNodeTypes>(
170        factory: &ProviderFactory<N>,
171        tip: u64,
172    ) -> eyre::Result<()> {
173        info!(target: "reth::cli", "Migrating StorageChangeSets → static files");
174        let provider = factory.provider()?.disable_long_read_transaction_safety();
175        let sf_provider = factory.static_file_provider();
176
177        let mut cursor = provider.tx_ref().cursor_read::<tables::StorageChangeSets>()?;
178
179        let first_block = provider
180            .get_prune_checkpoint(PruneSegment::StorageHistory)?
181            .and_then(|cp| cp.block_number)
182            .map_or(0, |b| b + 1);
183
184        // The writer always starts at the fixed range boundary (e.g. 2500000) which may be
185        // earlier than first_block (e.g. 2603897 from prune checkpoint).
186        let mut writer = sf_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
187        if first_block > 0 {
188            writer.ensure_at_block(first_block - 1)?;
189        }
190
191        let mut count = 0u64;
192        let mut walker = cursor.walk(Some((first_block, Address::ZERO).into()))?.peekable();
193
194        for block in first_block..=tip {
195            let mut entries = Vec::new();
196
197            while let Some(Ok((key, _))) = walker.peek() {
198                if key.block_number() != block {
199                    break;
200                }
201                let (key, entry) = walker.next().expect("peeked")?;
202                entries.push(StorageBeforeTx {
203                    address: key.address(),
204                    key: entry.key,
205                    value: entry.value,
206                });
207            }
208
209            count += entries.len() as u64;
210            writer.append_storage_changeset(entries, block)?;
211        }
212
213        writer.commit()?;
214
215        info!(target: "reth::cli", count, "StorageChangeSets migrated");
216        Ok(())
217    }
218
219    fn migrate_receipts<N: ProviderNodeTypes>(
220        factory: &ProviderFactory<N>,
221        tip: u64,
222    ) -> eyre::Result<()>
223    where
224        N::Primitives: reth_primitives_traits::NodePrimitives<
225            Receipt: reth_db_api::table::Value + reth_codecs::Compact,
226        >,
227    {
228        let provider = factory.provider()?;
229        if !provider.prune_modes_ref().receipts_log_filter.is_empty() {
230            info!(target: "reth::cli", "Receipt log filter pruning is enabled, keeping receipts in MDBX");
231            return Ok(());
232        }
233        drop(provider);
234
235        let sf_provider = factory.static_file_provider();
236        let existing = sf_provider.get_highest_static_file_block(StaticFileSegment::Receipts);
237
238        if existing.is_some_and(|b| b >= tip) {
239            info!(target: "reth::cli", "Receipts already in static files, skipping");
240            return Ok(());
241        }
242
243        info!(target: "reth::cli", "Migrating Receipts → static files");
244
245        let provider = factory.provider()?.disable_long_read_transaction_safety();
246        let prune_start = provider
247            .get_prune_checkpoint(PruneSegment::Receipts)?
248            .and_then(|cp| cp.block_number)
249            .map_or(0, |b| b + 1);
250        let first_block = prune_start.max(existing.map_or(0, |b| b + 1));
251
252        // The writer always starts at the fixed range boundary (e.g. 2500000) which may be
253        // earlier than first_block (e.g. 2603897 from prune checkpoint).
254        if first_block > 0 {
255            let mut writer = sf_provider.latest_writer(StaticFileSegment::Receipts)?;
256            writer.ensure_at_block(first_block - 1)?;
257            writer.commit()?;
258        }
259
260        let before = sf_provider
261            .get_highest_static_file_tx(StaticFileSegment::Receipts)
262            .map_or(0, |tx| tx + 1);
263
264        let block_range = first_block..=tip;
265
266        let segment = reth_static_file::segments::Receipts;
267        reth_static_file::segments::Segment::copy_to_static_files(&segment, provider, block_range)?;
268
269        sf_provider.commit()?;
270
271        let after = sf_provider
272            .get_highest_static_file_tx(StaticFileSegment::Receipts)
273            .map_or(0, |tx| tx + 1);
274        let count = after - before;
275        info!(target: "reth::cli", count, "Receipts migrated");
276        Ok(())
277    }
278
279    fn migrate_to_rocksdb<N: ProviderNodeTypes, T: Table>(
280        factory: &ProviderFactory<N>,
281    ) -> eyre::Result<()> {
282        info!(target: "reth::cli", table = T::NAME, "Migrating MDBX table → RocksDB");
283
284        let provider = factory.provider()?.disable_long_read_transaction_safety();
285        let mut cursor = provider.tx_ref().cursor_read::<T>()?;
286
287        let rocksdb = factory.rocksdb_provider();
288        rocksdb.clear::<T>()?;
289        let mut batch = rocksdb.batch_with_auto_commit();
290
291        let mut count = 0u64;
292        for entry in cursor.walk(None)? {
293            let (key, value) = entry?;
294            batch.put::<T>(key, &value)?;
295            count += 1;
296        }
297
298        batch.commit()?;
299        rocksdb.flush(&[T::NAME])?;
300
301        info!(target: "reth::cli", table = T::NAME, count, "MDBX table migrated to RocksDB");
302        Ok(())
303    }
304
305    /// Clears MDBX tables that were migrated to v2 backends or can be recomputed by the pipeline,
306    /// and resets only the recomputed stage checkpoints.
307    fn clear_migrated_and_recomputable_tables<N: ProviderNodeTypes>(
308        factory: &ProviderFactory<N>,
309    ) -> eyre::Result<()> {
310        info!(target: "reth::cli", "Clearing migrated and recomputable MDBX tables");
311        let db = factory.db_ref();
312
313        macro_rules! clear_table {
314            ($table:ty) => {{
315                let tx = db.tx_mut()?;
316                tx.clear::<$table>()?;
317                tx.commit()?;
318                info!(target: "reth::cli", table = <$table as Table>::NAME, "Cleared");
319            }};
320        }
321
322        // Migrated changeset tables (now in static files)
323        clear_table!(tables::AccountChangeSets);
324        clear_table!(tables::StorageChangeSets);
325
326        // Senders — rebuilt by SenderRecovery
327        clear_table!(tables::TransactionSenders);
328
329        // Indices — migrated to RocksDB
330        clear_table!(tables::TransactionHashNumbers);
331        clear_table!(tables::AccountsHistory);
332        clear_table!(tables::StoragesHistory);
333
334        // Plain state — superseded by hashed state in v2
335        clear_table!(tables::PlainAccountState);
336        clear_table!(tables::PlainStorageState);
337
338        // Trie — rebuilt by MerkleExecute
339        clear_table!(tables::AccountsTrie);
340        clear_table!(tables::StoragesTrie);
341
342        // Reset stage checkpoints so the pipeline rebuilds everything
343        info!(target: "reth::cli", "Resetting stage checkpoints");
344        let provider_rw = factory.database_provider_rw()?;
345        for stage in [StageId::SenderRecovery, StageId::MerkleExecute, StageId::MerkleUnwind] {
346            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(0))?;
347            info!(target: "reth::cli", %stage, "Checkpoint reset to 0");
348        }
349        provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
350
351        if provider_rw.last_block_number()? > 0 {
352            let first_indices_entry = provider_rw
353                .tx_ref()
354                .cursor_read::<tables::BlockBodyIndices>()?
355                .seek(1)?
356                .map(|(block, _)| block)
357                .ok_or_else(|| eyre::eyre!("no block body indices found"))?;
358
359            // If the first block body indices entry is not block 1, it means that the v1 database
360            // was likely initialized with dummy blocks coming from a dummy chain generated by
361            // `setup_without_evm`.
362            //
363            // In that case, sender recovery starts from the first block that has a corresponding
364            // block body indices entry.
365            if first_indices_entry > 1 {
366                provider_rw.save_stage_checkpoint(
367                    StageId::SenderRecovery,
368                    StageCheckpoint::new(first_indices_entry - 1),
369                )?;
370
371                // Make sure that senders static files segment is at the correct height.
372                let static_file_provider = provider_rw.static_file_provider();
373                let mut senders_writer =
374                    static_file_provider.latest_writer(StaticFileSegment::TransactionSenders)?;
375                senders_writer.ensure_at_block(first_indices_entry - 1)?;
376                senders_writer.commit()?;
377
378                warn!(
379                    target: "reth::cli",
380                    "Missing block body indices data for first {first_indices_entry} blocks, initializing sender recovery with the first block that has a corresponding block body indices entry"
381                );
382            }
383        }
384        provider_rw.commit()?;
385
386        info!(target: "reth::cli", "Recomputable tables cleared");
387        Ok(())
388    }
389
390    /// Creates a compacted copy of the MDBX database.
391    fn compact_mdbx(db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
392        let db_path = db.path();
393        let compact_path = db_path.with_file_name("db_compact");
394
395        reth_fs_util::create_dir_all(&compact_path)?;
396
397        info!(target: "reth::cli", ?db_path, ?compact_path, "Compacting MDBX database");
398
399        let compact_dest = compact_path.join("mdbx.dat");
400        let dest_cstr = std::ffi::CString::new(
401            compact_dest.to_str().ok_or_else(|| eyre::eyre!("compact path must be valid UTF-8"))?,
402        )?;
403
404        let flags = ffi::MDBX_CP_COMPACT | ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
405
406        let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
407            ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
408        });
409
410        if rc != 0 {
411            eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
412                std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
413            });
414        }
415
416        info!(target: "reth::cli", "MDBX compaction complete");
417        Ok(())
418    }
419
420    /// Swaps the original MDBX database with a compacted copy.
421    fn swap_compacted_db(
422        db_path: &std::path::Path,
423        compact_path: &std::path::Path,
424    ) -> eyre::Result<()> {
425        let backup_path = db_path.with_file_name("db_pre_compact");
426
427        info!(target: "reth::cli", ?db_path, ?compact_path, "Swapping compacted database");
428
429        std::fs::rename(db_path, &backup_path)?;
430
431        if let Err(e) = std::fs::rename(compact_path, db_path) {
432            let _ = std::fs::rename(&backup_path, db_path);
433            return Err(e.into());
434        }
435
436        std::fs::remove_dir_all(&backup_path)?;
437
438        info!(target: "reth::cli", "Database compaction swap complete");
439        Ok(())
440    }
441}