reth_cli_commands/stage/dump/
merkle.rs

1use std::sync::Arc;
2
3use super::setup;
4use alloy_primitives::BlockNumber;
5use eyre::Result;
6use reth_config::config::EtlConfig;
7use reth_consensus::{ConsensusError, FullConsensus};
8use reth_db::DatabaseEnv;
9use reth_db_api::{database::Database, table::TableImporter, tables};
10use reth_db_common::DbTool;
11use reth_evm::ConfigureEvm;
12use reth_exex::ExExManagerHandle;
13use reth_node_core::dirs::{ChainPath, DataDirPath};
14use reth_provider::{
15    providers::{ProviderNodeTypes, StaticFileProvider},
16    DatabaseProviderFactory, ProviderFactory,
17};
18use reth_stages::{
19    stages::{
20        AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
21        MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
22    },
23    ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
24};
25use tracing::info;
26
27pub(crate) async fn dump_merkle_stage<N>(
28    db_tool: &DbTool<N>,
29    from: BlockNumber,
30    to: BlockNumber,
31    output_datadir: ChainPath<DataDirPath>,
32    should_run: bool,
33    evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
34    consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
35) -> Result<()>
36where
37    N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
38{
39    let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
40
41    output_db.update(|tx| {
42        tx.import_table_with_range::<tables::Headers, _>(
43            &db_tool.provider_factory.db_ref().tx()?,
44            Some(from),
45            to,
46        )
47    })??;
48
49    output_db.update(|tx| {
50        tx.import_table_with_range::<tables::AccountChangeSets, _>(
51            &db_tool.provider_factory.db_ref().tx()?,
52            Some(from),
53            to,
54        )
55    })??;
56
57    unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
58
59    if should_run {
60        dry_run(
61            ProviderFactory::<N>::new(
62                Arc::new(output_db),
63                db_tool.chain(),
64                StaticFileProvider::read_write(output_datadir.static_files())?,
65            ),
66            to,
67            from,
68        )?;
69    }
70
71    Ok(())
72}
73
74/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
75fn unwind_and_copy<N: ProviderNodeTypes>(
76    db_tool: &DbTool<N>,
77    range: (u64, u64),
78    tip_block_number: u64,
79    output_db: &DatabaseEnv,
80    evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
81    consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
82) -> eyre::Result<()> {
83    let (from, to) = range;
84    let provider = db_tool.provider_factory.database_provider_rw()?;
85
86    let unwind = UnwindInput {
87        unwind_to: from,
88        checkpoint: StageCheckpoint::new(tip_block_number),
89        bad_block: None,
90    };
91    let execute_input =
92        reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
93
94    // Unwind hashes all the way to FROM
95
96    StorageHashingStage::default().unwind(&provider, unwind).unwrap();
97    AccountHashingStage::default().unwind(&provider, unwind).unwrap();
98
99    MerkleStage::default_unwind().unwind(&provider, unwind)?;
100
101    // Bring Plainstate to TO (hashing stage execution requires it)
102    let mut exec_stage = ExecutionStage::new(
103        evm_config, // Not necessary for unwinding.
104        Arc::new(consensus),
105        ExecutionStageThresholds {
106            max_blocks: Some(u64::MAX),
107            max_changes: None,
108            max_cumulative_gas: None,
109            max_duration: None,
110        },
111        MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
112        ExExManagerHandle::empty(),
113    );
114
115    exec_stage.unwind(
116        &provider,
117        UnwindInput {
118            unwind_to: to,
119            checkpoint: StageCheckpoint::new(tip_block_number),
120            bad_block: None,
121        },
122    )?;
123
124    // Bring hashes to TO
125    AccountHashingStage {
126        clean_threshold: u64::MAX,
127        commit_threshold: u64::MAX,
128        etl_config: EtlConfig::default(),
129    }
130    .execute(&provider, execute_input)
131    .unwrap();
132    StorageHashingStage {
133        clean_threshold: u64::MAX,
134        commit_threshold: u64::MAX,
135        etl_config: EtlConfig::default(),
136    }
137    .execute(&provider, execute_input)
138    .unwrap();
139
140    let unwind_inner_tx = provider.into_tx();
141
142    // TODO optimize we can actually just get the entries we need
143    output_db
144        .update(|tx| tx.import_dupsort::<tables::StorageChangeSets, _>(&unwind_inner_tx))??;
145
146    output_db.update(|tx| tx.import_table::<tables::HashedAccounts, _>(&unwind_inner_tx))??;
147    output_db.update(|tx| tx.import_dupsort::<tables::HashedStorages, _>(&unwind_inner_tx))??;
148    output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(&unwind_inner_tx))??;
149    output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(&unwind_inner_tx))??;
150
151    Ok(())
152}
153
154/// Try to re-execute the stage straight away
155fn dry_run<N>(output_provider_factory: ProviderFactory<N>, to: u64, from: u64) -> eyre::Result<()>
156where
157    N: ProviderNodeTypes,
158{
159    info!(target: "reth::cli", "Executing stage.");
160    let provider = output_provider_factory.database_provider_rw()?;
161
162    let mut stage = MerkleStage::Execution {
163        // Forces updating the root instead of calculating from scratch
164        rebuild_threshold: u64::MAX,
165        incremental_threshold: u64::MAX,
166    };
167
168    loop {
169        let input = reth_stages::ExecInput {
170            target: Some(to),
171            checkpoint: Some(StageCheckpoint::new(from)),
172        };
173        if stage.execute(&provider, input)?.done {
174            break
175        }
176    }
177
178    info!(target: "reth::cli", "Success");
179
180    Ok(())
181}