reth_cli_commands/stage/dump/
merkle.rs1use std::sync::Arc;
2
3use super::setup;
4use alloy_primitives::BlockNumber;
5use eyre::Result;
6use reth_config::config::EtlConfig;
7use reth_consensus::{ConsensusError, FullConsensus};
8use reth_db::DatabaseEnv;
9use reth_db_api::{database::Database, table::TableImporter, tables};
10use reth_db_common::DbTool;
11use reth_evm::ConfigureEvm;
12use reth_exex::ExExManagerHandle;
13use reth_node_core::dirs::{ChainPath, DataDirPath};
14use reth_provider::{
15 providers::{ProviderNodeTypes, StaticFileProvider},
16 DatabaseProviderFactory, ProviderFactory,
17};
18use reth_stages::{
19 stages::{
20 AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
21 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
22 },
23 ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
24};
25use tracing::info;
26
27pub(crate) async fn dump_merkle_stage<N>(
28 db_tool: &DbTool<N>,
29 from: BlockNumber,
30 to: BlockNumber,
31 output_datadir: ChainPath<DataDirPath>,
32 should_run: bool,
33 evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
34 consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
35) -> Result<()>
36where
37 N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
38{
39 let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
40
41 output_db.update(|tx| {
42 tx.import_table_with_range::<tables::Headers, _>(
43 &db_tool.provider_factory.db_ref().tx()?,
44 Some(from),
45 to,
46 )
47 })??;
48
49 output_db.update(|tx| {
50 tx.import_table_with_range::<tables::AccountChangeSets, _>(
51 &db_tool.provider_factory.db_ref().tx()?,
52 Some(from),
53 to,
54 )
55 })??;
56
57 unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
58
59 if should_run {
60 dry_run(
61 ProviderFactory::<N>::new(
62 Arc::new(output_db),
63 db_tool.chain(),
64 StaticFileProvider::read_write(output_datadir.static_files())?,
65 ),
66 to,
67 from,
68 )?;
69 }
70
71 Ok(())
72}
73
74fn unwind_and_copy<N: ProviderNodeTypes>(
76 db_tool: &DbTool<N>,
77 range: (u64, u64),
78 tip_block_number: u64,
79 output_db: &DatabaseEnv,
80 evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
81 consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
82) -> eyre::Result<()> {
83 let (from, to) = range;
84 let provider = db_tool.provider_factory.database_provider_rw()?;
85
86 let unwind = UnwindInput {
87 unwind_to: from,
88 checkpoint: StageCheckpoint::new(tip_block_number),
89 bad_block: None,
90 };
91 let execute_input =
92 reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
93
94 StorageHashingStage::default().unwind(&provider, unwind)?;
96 AccountHashingStage::default().unwind(&provider, unwind)?;
97 MerkleStage::default_unwind().unwind(&provider, unwind)?;
98
99 let mut exec_stage = ExecutionStage::new(
101 evm_config, Arc::new(consensus),
103 ExecutionStageThresholds {
104 max_blocks: Some(u64::MAX),
105 max_changes: None,
106 max_cumulative_gas: None,
107 max_duration: None,
108 },
109 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
110 ExExManagerHandle::empty(),
111 );
112
113 exec_stage.unwind(
114 &provider,
115 UnwindInput {
116 unwind_to: to,
117 checkpoint: StageCheckpoint::new(tip_block_number),
118 bad_block: None,
119 },
120 )?;
121
122 AccountHashingStage {
124 clean_threshold: u64::MAX,
125 commit_threshold: u64::MAX,
126 etl_config: EtlConfig::default(),
127 }
128 .execute(&provider, execute_input)?;
129 StorageHashingStage {
130 clean_threshold: u64::MAX,
131 commit_threshold: u64::MAX,
132 etl_config: EtlConfig::default(),
133 }
134 .execute(&provider, execute_input)?;
135
136 let unwind_inner_tx = provider.into_tx();
137
138 output_db
140 .update(|tx| tx.import_dupsort::<tables::StorageChangeSets, _>(&unwind_inner_tx))??;
141
142 output_db.update(|tx| tx.import_table::<tables::HashedAccounts, _>(&unwind_inner_tx))??;
143 output_db.update(|tx| tx.import_dupsort::<tables::HashedStorages, _>(&unwind_inner_tx))??;
144 output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(&unwind_inner_tx))??;
145 output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(&unwind_inner_tx))??;
146
147 Ok(())
148}
149
150fn dry_run<N>(output_provider_factory: ProviderFactory<N>, to: u64, from: u64) -> eyre::Result<()>
152where
153 N: ProviderNodeTypes,
154{
155 info!(target: "reth::cli", "Executing stage.");
156 let provider = output_provider_factory.database_provider_rw()?;
157
158 let mut stage = MerkleStage::Execution {
159 rebuild_threshold: u64::MAX,
161 incremental_threshold: u64::MAX,
162 };
163
164 loop {
165 let input = reth_stages::ExecInput {
166 target: Some(to),
167 checkpoint: Some(StageCheckpoint::new(from)),
168 };
169 if stage.execute(&provider, input)?.done {
170 break
171 }
172 }
173
174 info!(target: "reth::cli", "Success");
175
176 Ok(())
177}