reth_cli_commands/stage/dump/
merkle.rs

1use std::sync::Arc;
2
3use super::setup;
4use alloy_primitives::BlockNumber;
5use eyre::Result;
6use reth_config::config::EtlConfig;
7use reth_consensus::{ConsensusError, FullConsensus};
8use reth_db::DatabaseEnv;
9use reth_db_api::{database::Database, table::TableImporter, tables};
10use reth_db_common::DbTool;
11use reth_evm::ConfigureEvm;
12use reth_exex::ExExManagerHandle;
13use reth_node_core::dirs::{ChainPath, DataDirPath};
14use reth_provider::{
15    providers::{ProviderNodeTypes, StaticFileProvider},
16    DatabaseProviderFactory, ProviderFactory,
17};
18use reth_stages::{
19    stages::{
20        AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
21        MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
22    },
23    ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
24};
25use tracing::info;
26
27pub(crate) async fn dump_merkle_stage<N>(
28    db_tool: &DbTool<N>,
29    from: BlockNumber,
30    to: BlockNumber,
31    output_datadir: ChainPath<DataDirPath>,
32    should_run: bool,
33    evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
34    consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
35) -> Result<()>
36where
37    N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
38{
39    let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
40
41    output_db.update(|tx| {
42        tx.import_table_with_range::<tables::Headers, _>(
43            &db_tool.provider_factory.db_ref().tx()?,
44            Some(from),
45            to,
46        )
47    })??;
48
49    output_db.update(|tx| {
50        tx.import_table_with_range::<tables::AccountChangeSets, _>(
51            &db_tool.provider_factory.db_ref().tx()?,
52            Some(from),
53            to,
54        )
55    })??;
56
57    unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
58
59    if should_run {
60        dry_run(
61            ProviderFactory::<N>::new(
62                Arc::new(output_db),
63                db_tool.chain(),
64                StaticFileProvider::read_write(output_datadir.static_files())?,
65            ),
66            to,
67            from,
68        )?;
69    }
70
71    Ok(())
72}
73
74/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
75fn unwind_and_copy<N: ProviderNodeTypes>(
76    db_tool: &DbTool<N>,
77    range: (u64, u64),
78    tip_block_number: u64,
79    output_db: &DatabaseEnv,
80    evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
81    consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
82) -> eyre::Result<()> {
83    let (from, to) = range;
84    let provider = db_tool.provider_factory.database_provider_rw()?;
85
86    let unwind = UnwindInput {
87        unwind_to: from,
88        checkpoint: StageCheckpoint::new(tip_block_number),
89        bad_block: None,
90    };
91    let execute_input =
92        reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
93
94    // Unwind hashes all the way to FROM
95    StorageHashingStage::default().unwind(&provider, unwind)?;
96    AccountHashingStage::default().unwind(&provider, unwind)?;
97    MerkleStage::default_unwind().unwind(&provider, unwind)?;
98
99    // Bring Plainstate to TO (hashing stage execution requires it)
100    let mut exec_stage = ExecutionStage::new(
101        evm_config, // Not necessary for unwinding.
102        Arc::new(consensus),
103        ExecutionStageThresholds {
104            max_blocks: Some(u64::MAX),
105            max_changes: None,
106            max_cumulative_gas: None,
107            max_duration: None,
108        },
109        MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
110        ExExManagerHandle::empty(),
111    );
112
113    exec_stage.unwind(
114        &provider,
115        UnwindInput {
116            unwind_to: to,
117            checkpoint: StageCheckpoint::new(tip_block_number),
118            bad_block: None,
119        },
120    )?;
121
122    // Bring hashes to TO
123    AccountHashingStage {
124        clean_threshold: u64::MAX,
125        commit_threshold: u64::MAX,
126        etl_config: EtlConfig::default(),
127    }
128    .execute(&provider, execute_input)?;
129    StorageHashingStage {
130        clean_threshold: u64::MAX,
131        commit_threshold: u64::MAX,
132        etl_config: EtlConfig::default(),
133    }
134    .execute(&provider, execute_input)?;
135
136    let unwind_inner_tx = provider.into_tx();
137
138    // TODO optimize we can actually just get the entries we need
139    output_db
140        .update(|tx| tx.import_dupsort::<tables::StorageChangeSets, _>(&unwind_inner_tx))??;
141
142    output_db.update(|tx| tx.import_table::<tables::HashedAccounts, _>(&unwind_inner_tx))??;
143    output_db.update(|tx| tx.import_dupsort::<tables::HashedStorages, _>(&unwind_inner_tx))??;
144    output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(&unwind_inner_tx))??;
145    output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(&unwind_inner_tx))??;
146
147    Ok(())
148}
149
150/// Try to re-execute the stage straight away
151fn dry_run<N>(output_provider_factory: ProviderFactory<N>, to: u64, from: u64) -> eyre::Result<()>
152where
153    N: ProviderNodeTypes,
154{
155    info!(target: "reth::cli", "Executing stage.");
156    let provider = output_provider_factory.database_provider_rw()?;
157
158    let mut stage = MerkleStage::Execution {
159        // Forces updating the root instead of calculating from scratch
160        rebuild_threshold: u64::MAX,
161        incremental_threshold: u64::MAX,
162    };
163
164    loop {
165        let input = reth_stages::ExecInput {
166            target: Some(to),
167            checkpoint: Some(StageCheckpoint::new(from)),
168        };
169        if stage.execute(&provider, input)?.done {
170            break
171        }
172    }
173
174    info!(target: "reth::cli", "Success");
175
176    Ok(())
177}