reth_cli_commands/stage/dump/
merkle.rs
1use std::sync::Arc;
2
3use super::setup;
4use alloy_primitives::BlockNumber;
5use eyre::Result;
6use reth_config::config::EtlConfig;
7use reth_consensus::noop::NoopConsensus;
8use reth_db::DatabaseEnv;
9use reth_db_api::{database::Database, table::TableImporter, tables};
10use reth_db_common::DbTool;
11use reth_evm::noop::NoopBlockExecutorProvider;
12use reth_exex::ExExManagerHandle;
13use reth_node_core::dirs::{ChainPath, DataDirPath};
14use reth_provider::{
15 providers::{ProviderNodeTypes, StaticFileProvider},
16 DatabaseProviderFactory, ProviderFactory,
17};
18use reth_stages::{
19 stages::{
20 AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
21 MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
22 },
23 ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
24};
25use tracing::info;
26
27pub(crate) async fn dump_merkle_stage<N>(
28 db_tool: &DbTool<N>,
29 from: BlockNumber,
30 to: BlockNumber,
31 output_datadir: ChainPath<DataDirPath>,
32 should_run: bool,
33) -> Result<()>
34where
35 N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
36{
37 let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
38
39 output_db.update(|tx| {
40 tx.import_table_with_range::<tables::Headers, _>(
41 &db_tool.provider_factory.db_ref().tx()?,
42 Some(from),
43 to,
44 )
45 })??;
46
47 output_db.update(|tx| {
48 tx.import_table_with_range::<tables::AccountChangeSets, _>(
49 &db_tool.provider_factory.db_ref().tx()?,
50 Some(from),
51 to,
52 )
53 })??;
54
55 unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db)?;
56
57 if should_run {
58 dry_run(
59 ProviderFactory::<N>::new(
60 Arc::new(output_db),
61 db_tool.chain(),
62 StaticFileProvider::read_write(output_datadir.static_files())?,
63 ),
64 to,
65 from,
66 )?;
67 }
68
69 Ok(())
70}
71
72fn unwind_and_copy<N: ProviderNodeTypes>(
74 db_tool: &DbTool<N>,
75 range: (u64, u64),
76 tip_block_number: u64,
77 output_db: &DatabaseEnv,
78) -> eyre::Result<()> {
79 let (from, to) = range;
80 let provider = db_tool.provider_factory.database_provider_rw()?;
81
82 let unwind = UnwindInput {
83 unwind_to: from,
84 checkpoint: StageCheckpoint::new(tip_block_number),
85 bad_block: None,
86 };
87 let execute_input =
88 reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
89
90 StorageHashingStage::default().unwind(&provider, unwind).unwrap();
93 AccountHashingStage::default().unwind(&provider, unwind).unwrap();
94
95 MerkleStage::default_unwind().unwind(&provider, unwind)?;
96
97 let mut exec_stage = ExecutionStage::new(
99 NoopBlockExecutorProvider::<N::Primitives>::default(), NoopConsensus::arc(),
101 ExecutionStageThresholds {
102 max_blocks: Some(u64::MAX),
103 max_changes: None,
104 max_cumulative_gas: None,
105 max_duration: None,
106 },
107 MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
108 ExExManagerHandle::empty(),
109 );
110
111 exec_stage.unwind(
112 &provider,
113 UnwindInput {
114 unwind_to: to,
115 checkpoint: StageCheckpoint::new(tip_block_number),
116 bad_block: None,
117 },
118 )?;
119
120 AccountHashingStage {
122 clean_threshold: u64::MAX,
123 commit_threshold: u64::MAX,
124 etl_config: EtlConfig::default(),
125 }
126 .execute(&provider, execute_input)
127 .unwrap();
128 StorageHashingStage {
129 clean_threshold: u64::MAX,
130 commit_threshold: u64::MAX,
131 etl_config: EtlConfig::default(),
132 }
133 .execute(&provider, execute_input)
134 .unwrap();
135
136 let unwind_inner_tx = provider.into_tx();
137
138 output_db
140 .update(|tx| tx.import_dupsort::<tables::StorageChangeSets, _>(&unwind_inner_tx))??;
141
142 output_db.update(|tx| tx.import_table::<tables::HashedAccounts, _>(&unwind_inner_tx))??;
143 output_db.update(|tx| tx.import_dupsort::<tables::HashedStorages, _>(&unwind_inner_tx))??;
144 output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(&unwind_inner_tx))??;
145 output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(&unwind_inner_tx))??;
146
147 Ok(())
148}
149
150fn dry_run<N>(output_provider_factory: ProviderFactory<N>, to: u64, from: u64) -> eyre::Result<()>
152where
153 N: ProviderNodeTypes,
154{
155 info!(target: "reth::cli", "Executing stage.");
156 let provider = output_provider_factory.database_provider_rw()?;
157
158 let mut stage = MerkleStage::Execution {
159 clean_threshold: u64::MAX,
161 };
162
163 loop {
164 let input = reth_stages::ExecInput {
165 target: Some(to),
166 checkpoint: Some(StageCheckpoint::new(from)),
167 };
168 if stage.execute(&provider, input)?.done {
169 break
170 }
171 }
172
173 info!(target: "reth::cli", "Success");
174
175 Ok(())
176}