reth_cli_commands/stage/dump/
merkle.rs

1use std::sync::Arc;
2
3use super::setup;
4use alloy_primitives::BlockNumber;
5use eyre::Result;
6use reth_config::config::EtlConfig;
7use reth_consensus::noop::NoopConsensus;
8use reth_db::DatabaseEnv;
9use reth_db_api::{database::Database, table::TableImporter, tables};
10use reth_db_common::DbTool;
11use reth_evm::noop::NoopBlockExecutorProvider;
12use reth_exex::ExExManagerHandle;
13use reth_node_core::dirs::{ChainPath, DataDirPath};
14use reth_provider::{
15    providers::{ProviderNodeTypes, StaticFileProvider},
16    DatabaseProviderFactory, ProviderFactory,
17};
18use reth_stages::{
19    stages::{
20        AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
21        MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
22    },
23    ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
24};
25use tracing::info;
26
27pub(crate) async fn dump_merkle_stage<N>(
28    db_tool: &DbTool<N>,
29    from: BlockNumber,
30    to: BlockNumber,
31    output_datadir: ChainPath<DataDirPath>,
32    should_run: bool,
33) -> Result<()>
34where
35    N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
36{
37    let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
38
39    output_db.update(|tx| {
40        tx.import_table_with_range::<tables::Headers, _>(
41            &db_tool.provider_factory.db_ref().tx()?,
42            Some(from),
43            to,
44        )
45    })??;
46
47    output_db.update(|tx| {
48        tx.import_table_with_range::<tables::AccountChangeSets, _>(
49            &db_tool.provider_factory.db_ref().tx()?,
50            Some(from),
51            to,
52        )
53    })??;
54
55    unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db)?;
56
57    if should_run {
58        dry_run(
59            ProviderFactory::<N>::new(
60                Arc::new(output_db),
61                db_tool.chain(),
62                StaticFileProvider::read_write(output_datadir.static_files())?,
63            ),
64            to,
65            from,
66        )?;
67    }
68
69    Ok(())
70}
71
72/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
73fn unwind_and_copy<N: ProviderNodeTypes>(
74    db_tool: &DbTool<N>,
75    range: (u64, u64),
76    tip_block_number: u64,
77    output_db: &DatabaseEnv,
78) -> eyre::Result<()> {
79    let (from, to) = range;
80    let provider = db_tool.provider_factory.database_provider_rw()?;
81
82    let unwind = UnwindInput {
83        unwind_to: from,
84        checkpoint: StageCheckpoint::new(tip_block_number),
85        bad_block: None,
86    };
87    let execute_input =
88        reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
89
90    // Unwind hashes all the way to FROM
91
92    StorageHashingStage::default().unwind(&provider, unwind).unwrap();
93    AccountHashingStage::default().unwind(&provider, unwind).unwrap();
94
95    MerkleStage::default_unwind().unwind(&provider, unwind)?;
96
97    // Bring Plainstate to TO (hashing stage execution requires it)
98    let mut exec_stage = ExecutionStage::new(
99        NoopBlockExecutorProvider::<N::Primitives>::default(), // Not necessary for unwinding.
100        NoopConsensus::arc(),
101        ExecutionStageThresholds {
102            max_blocks: Some(u64::MAX),
103            max_changes: None,
104            max_cumulative_gas: None,
105            max_duration: None,
106        },
107        MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
108        ExExManagerHandle::empty(),
109    );
110
111    exec_stage.unwind(
112        &provider,
113        UnwindInput {
114            unwind_to: to,
115            checkpoint: StageCheckpoint::new(tip_block_number),
116            bad_block: None,
117        },
118    )?;
119
120    // Bring hashes to TO
121    AccountHashingStage {
122        clean_threshold: u64::MAX,
123        commit_threshold: u64::MAX,
124        etl_config: EtlConfig::default(),
125    }
126    .execute(&provider, execute_input)
127    .unwrap();
128    StorageHashingStage {
129        clean_threshold: u64::MAX,
130        commit_threshold: u64::MAX,
131        etl_config: EtlConfig::default(),
132    }
133    .execute(&provider, execute_input)
134    .unwrap();
135
136    let unwind_inner_tx = provider.into_tx();
137
138    // TODO optimize we can actually just get the entries we need
139    output_db
140        .update(|tx| tx.import_dupsort::<tables::StorageChangeSets, _>(&unwind_inner_tx))??;
141
142    output_db.update(|tx| tx.import_table::<tables::HashedAccounts, _>(&unwind_inner_tx))??;
143    output_db.update(|tx| tx.import_dupsort::<tables::HashedStorages, _>(&unwind_inner_tx))??;
144    output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(&unwind_inner_tx))??;
145    output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(&unwind_inner_tx))??;
146
147    Ok(())
148}
149
150/// Try to re-execute the stage straight away
151fn dry_run<N>(output_provider_factory: ProviderFactory<N>, to: u64, from: u64) -> eyre::Result<()>
152where
153    N: ProviderNodeTypes,
154{
155    info!(target: "reth::cli", "Executing stage.");
156    let provider = output_provider_factory.database_provider_rw()?;
157
158    let mut stage = MerkleStage::Execution {
159        // Forces updating the root instead of calculating from scratch
160        clean_threshold: u64::MAX,
161    };
162
163    loop {
164        let input = reth_stages::ExecInput {
165            target: Some(to),
166            checkpoint: Some(StageCheckpoint::new(from)),
167        };
168        if stage.execute(&provider, input)?.done {
169            break
170        }
171    }
172
173    info!(target: "reth::cli", "Success");
174
175    Ok(())
176}