reth_cli_commands/stage/dump/
execution.rs
1use super::setup;
2use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
3use reth_db::DatabaseEnv;
4use reth_db_api::{
5 cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
6};
7use reth_db_common::DbTool;
8use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider};
9use reth_node_builder::NodeTypesWithDB;
10use reth_node_core::dirs::{ChainPath, DataDirPath};
11use reth_provider::{
12 providers::{ProviderNodeTypes, StaticFileProvider},
13 DatabaseProviderFactory, ProviderFactory,
14};
15use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
16use std::sync::Arc;
17use tracing::info;
18
19pub(crate) async fn dump_execution_stage<N, E, C>(
20 db_tool: &DbTool<N>,
21 from: u64,
22 to: u64,
23 output_datadir: ChainPath<DataDirPath>,
24 should_run: bool,
25 executor: E,
26 consensus: C,
27) -> eyre::Result<()>
28where
29 N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
30 E: BlockExecutorProvider<Primitives = N::Primitives>,
31 C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
32{
33 let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
34
35 import_tables_with_range(&output_db, db_tool, from, to)?;
36
37 unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
38
39 if should_run {
40 dry_run(
41 ProviderFactory::<N>::new(
42 Arc::new(output_db),
43 db_tool.chain(),
44 StaticFileProvider::read_write(output_datadir.static_files())?,
45 ),
46 to,
47 from,
48 executor,
49 consensus,
50 )?;
51 }
52
53 Ok(())
54}
55
56fn import_tables_with_range<N: NodeTypesWithDB>(
58 output_db: &DatabaseEnv,
59 db_tool: &DbTool<N>,
60 from: u64,
61 to: u64,
62) -> eyre::Result<()> {
63 output_db.update(|tx| {
66 tx.import_table_with_range::<tables::CanonicalHeaders, _>(
67 &db_tool.provider_factory.db_ref().tx()?,
68 Some(from),
69 to,
70 )
71 })??;
72 output_db.update(|tx| {
73 tx.import_table_with_range::<tables::HeaderTerminalDifficulties, _>(
74 &db_tool.provider_factory.db_ref().tx()?,
75 Some(from),
76 to,
77 )
78 })??;
79 output_db.update(|tx| {
80 tx.import_table_with_range::<tables::Headers, _>(
81 &db_tool.provider_factory.db_ref().tx()?,
82 Some(from),
83 to,
84 )
85 })??;
86 output_db.update(|tx| {
87 tx.import_table_with_range::<tables::BlockBodyIndices, _>(
88 &db_tool.provider_factory.db_ref().tx()?,
89 Some(from),
90 to,
91 )
92 })??;
93 output_db.update(|tx| {
94 tx.import_table_with_range::<tables::BlockOmmers, _>(
95 &db_tool.provider_factory.db_ref().tx()?,
96 Some(from),
97 to,
98 )
99 })??;
100
101 let (from_tx, to_tx) = db_tool.provider_factory.db_ref().view(|read_tx| {
103 let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
104 let (_, from_block) =
105 read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
106 let (_, to_block) =
107 read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
108
109 Ok::<(u64, u64), eyre::ErrReport>((
110 from_block.first_tx_num,
111 to_block.first_tx_num + to_block.tx_count,
112 ))
113 })??;
114
115 output_db.update(|tx| {
116 tx.import_table_with_range::<tables::Transactions, _>(
117 &db_tool.provider_factory.db_ref().tx()?,
118 Some(from_tx),
119 to_tx,
120 )
121 })??;
122
123 output_db.update(|tx| {
124 tx.import_table_with_range::<tables::TransactionSenders, _>(
125 &db_tool.provider_factory.db_ref().tx()?,
126 Some(from_tx),
127 to_tx,
128 )
129 })??;
130
131 Ok(())
132}
133
134fn unwind_and_copy<N: ProviderNodeTypes>(
138 db_tool: &DbTool<N>,
139 from: u64,
140 tip_block_number: u64,
141 output_db: &DatabaseEnv,
142) -> eyre::Result<()> {
143 let provider = db_tool.provider_factory.database_provider_rw()?;
144
145 let mut exec_stage = ExecutionStage::new_with_executor(
146 NoopBlockExecutorProvider::<N::Primitives>::default(),
147 NoopConsensus::arc(),
148 );
149
150 exec_stage.unwind(
151 &provider,
152 UnwindInput {
153 unwind_to: from,
154 checkpoint: StageCheckpoint::new(tip_block_number),
155 bad_block: None,
156 },
157 )?;
158
159 let unwind_inner_tx = provider.into_tx();
160
161 output_db
162 .update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(&unwind_inner_tx))??;
163 output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
164 output_db.update(|tx| tx.import_table::<tables::Bytecodes, _>(&unwind_inner_tx))??;
165
166 Ok(())
167}
168
169fn dry_run<N, E, C>(
171 output_provider_factory: ProviderFactory<N>,
172 to: u64,
173 from: u64,
174 executor: E,
175 consensus: C,
176) -> eyre::Result<()>
177where
178 N: ProviderNodeTypes,
179 E: BlockExecutorProvider<Primitives = N::Primitives>,
180 C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
181{
182 info!(target: "reth::cli", "Executing stage. [dry-run]");
183
184 let mut exec_stage = ExecutionStage::new_with_executor(executor, Arc::new(consensus));
185
186 let input =
187 reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
188 exec_stage.execute(&output_provider_factory.database_provider_rw()?, input)?;
189
190 info!(target: "reth::cli", "Success");
191
192 Ok(())
193}