reth_cli_commands/stage/dump/
merkle.rs1use std::sync::Arc;
2
3use super::setup;
4use alloy_primitives::BlockNumber;
5use eyre::Result;
6use reth_config::config::EtlConfig;
7use reth_consensus::{ConsensusError, FullConsensus};
8use reth_db::DatabaseEnv;
9use reth_db_api::{database::Database, table::TableImporter, tables};
10use reth_db_common::DbTool;
11use reth_evm::ConfigureEvm;
12use reth_exex::ExExManagerHandle;
13use reth_node_core::dirs::{ChainPath, DataDirPath};
14use reth_provider::{
15 providers::{ProviderNodeTypes, StaticFileProvider},
16 DatabaseProviderFactory, ProviderFactory,
17};
18use reth_stages::{
19 stages::{
20 AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
21 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
22 },
23 ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
24};
25use tracing::info;
26
27pub(crate) async fn dump_merkle_stage<N>(
28 db_tool: &DbTool<N>,
29 from: BlockNumber,
30 to: BlockNumber,
31 output_datadir: ChainPath<DataDirPath>,
32 should_run: bool,
33 evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
34 consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
35) -> Result<()>
36where
37 N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
38{
39 let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
40
41 output_db.update(|tx| {
42 tx.import_table_with_range::<tables::Headers, _>(
43 &db_tool.provider_factory.db_ref().tx()?,
44 Some(from),
45 to,
46 )
47 })??;
48
49 output_db.update(|tx| {
50 tx.import_table_with_range::<tables::AccountChangeSets, _>(
51 &db_tool.provider_factory.db_ref().tx()?,
52 Some(from),
53 to,
54 )
55 })??;
56
57 unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
58
59 if should_run {
60 dry_run(
61 ProviderFactory::<N>::new(
62 Arc::new(output_db),
63 db_tool.chain(),
64 StaticFileProvider::read_write(output_datadir.static_files())?,
65 ),
66 to,
67 from,
68 )?;
69 }
70
71 Ok(())
72}
73
74fn unwind_and_copy<N: ProviderNodeTypes>(
76 db_tool: &DbTool<N>,
77 range: (u64, u64),
78 tip_block_number: u64,
79 output_db: &DatabaseEnv,
80 evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
81 consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
82) -> eyre::Result<()> {
83 let (from, to) = range;
84 let provider = db_tool.provider_factory.database_provider_rw()?;
85
86 let unwind = UnwindInput {
87 unwind_to: from,
88 checkpoint: StageCheckpoint::new(tip_block_number),
89 bad_block: None,
90 };
91 let execute_input =
92 reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
93
94 StorageHashingStage::default().unwind(&provider, unwind).unwrap();
97 AccountHashingStage::default().unwind(&provider, unwind).unwrap();
98
99 MerkleStage::default_unwind().unwind(&provider, unwind)?;
100
101 let mut exec_stage = ExecutionStage::new(
103 evm_config, Arc::new(consensus),
105 ExecutionStageThresholds {
106 max_blocks: Some(u64::MAX),
107 max_changes: None,
108 max_cumulative_gas: None,
109 max_duration: None,
110 },
111 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
112 ExExManagerHandle::empty(),
113 );
114
115 exec_stage.unwind(
116 &provider,
117 UnwindInput {
118 unwind_to: to,
119 checkpoint: StageCheckpoint::new(tip_block_number),
120 bad_block: None,
121 },
122 )?;
123
124 AccountHashingStage {
126 clean_threshold: u64::MAX,
127 commit_threshold: u64::MAX,
128 etl_config: EtlConfig::default(),
129 }
130 .execute(&provider, execute_input)
131 .unwrap();
132 StorageHashingStage {
133 clean_threshold: u64::MAX,
134 commit_threshold: u64::MAX,
135 etl_config: EtlConfig::default(),
136 }
137 .execute(&provider, execute_input)
138 .unwrap();
139
140 let unwind_inner_tx = provider.into_tx();
141
142 output_db
144 .update(|tx| tx.import_dupsort::<tables::StorageChangeSets, _>(&unwind_inner_tx))??;
145
146 output_db.update(|tx| tx.import_table::<tables::HashedAccounts, _>(&unwind_inner_tx))??;
147 output_db.update(|tx| tx.import_dupsort::<tables::HashedStorages, _>(&unwind_inner_tx))??;
148 output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(&unwind_inner_tx))??;
149 output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(&unwind_inner_tx))??;
150
151 Ok(())
152}
153
154fn dry_run<N>(output_provider_factory: ProviderFactory<N>, to: u64, from: u64) -> eyre::Result<()>
156where
157 N: ProviderNodeTypes,
158{
159 info!(target: "reth::cli", "Executing stage.");
160 let provider = output_provider_factory.database_provider_rw()?;
161
162 let mut stage = MerkleStage::Execution {
163 rebuild_threshold: u64::MAX,
165 incremental_threshold: u64::MAX,
166 };
167
168 loop {
169 let input = reth_stages::ExecInput {
170 target: Some(to),
171 checkpoint: Some(StageCheckpoint::new(from)),
172 };
173 if stage.execute(&provider, input)?.done {
174 break
175 }
176 }
177
178 info!(target: "reth::cli", "Success");
179
180 Ok(())
181}