reth_cli_commands/stage/dump/
execution.rs

1use super::setup;
2use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
3use reth_db::DatabaseEnv;
4use reth_db_api::{
5    cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
6};
7use reth_db_common::DbTool;
8use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider};
9use reth_node_builder::NodeTypesWithDB;
10use reth_node_core::dirs::{ChainPath, DataDirPath};
11use reth_provider::{
12    providers::{ProviderNodeTypes, StaticFileProvider},
13    DatabaseProviderFactory, ProviderFactory,
14};
15use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
16use std::sync::Arc;
17use tracing::info;
18
19pub(crate) async fn dump_execution_stage<N, E, C>(
20    db_tool: &DbTool<N>,
21    from: u64,
22    to: u64,
23    output_datadir: ChainPath<DataDirPath>,
24    should_run: bool,
25    executor: E,
26    consensus: C,
27) -> eyre::Result<()>
28where
29    N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
30    E: BlockExecutorProvider<Primitives = N::Primitives>,
31    C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
32{
33    let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
34
35    import_tables_with_range(&output_db, db_tool, from, to)?;
36
37    unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
38
39    if should_run {
40        dry_run(
41            ProviderFactory::<N>::new(
42                Arc::new(output_db),
43                db_tool.chain(),
44                StaticFileProvider::read_write(output_datadir.static_files())?,
45            ),
46            to,
47            from,
48            executor,
49            consensus,
50        )?;
51    }
52
53    Ok(())
54}
55
56/// Imports all the tables that can be copied over a range.
57fn import_tables_with_range<N: NodeTypesWithDB>(
58    output_db: &DatabaseEnv,
59    db_tool: &DbTool<N>,
60    from: u64,
61    to: u64,
62) -> eyre::Result<()> {
63    //  We're not sharing the transaction in case the memory grows too much.
64
65    output_db.update(|tx| {
66        tx.import_table_with_range::<tables::CanonicalHeaders, _>(
67            &db_tool.provider_factory.db_ref().tx()?,
68            Some(from),
69            to,
70        )
71    })??;
72    output_db.update(|tx| {
73        tx.import_table_with_range::<tables::HeaderTerminalDifficulties, _>(
74            &db_tool.provider_factory.db_ref().tx()?,
75            Some(from),
76            to,
77        )
78    })??;
79    output_db.update(|tx| {
80        tx.import_table_with_range::<tables::Headers, _>(
81            &db_tool.provider_factory.db_ref().tx()?,
82            Some(from),
83            to,
84        )
85    })??;
86    output_db.update(|tx| {
87        tx.import_table_with_range::<tables::BlockBodyIndices, _>(
88            &db_tool.provider_factory.db_ref().tx()?,
89            Some(from),
90            to,
91        )
92    })??;
93    output_db.update(|tx| {
94        tx.import_table_with_range::<tables::BlockOmmers, _>(
95            &db_tool.provider_factory.db_ref().tx()?,
96            Some(from),
97            to,
98        )
99    })??;
100
101    // Find range of transactions that need to be copied over
102    let (from_tx, to_tx) = db_tool.provider_factory.db_ref().view(|read_tx| {
103        let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
104        let (_, from_block) =
105            read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
106        let (_, to_block) =
107            read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
108
109        Ok::<(u64, u64), eyre::ErrReport>((
110            from_block.first_tx_num,
111            to_block.first_tx_num + to_block.tx_count,
112        ))
113    })??;
114
115    output_db.update(|tx| {
116        tx.import_table_with_range::<tables::Transactions, _>(
117            &db_tool.provider_factory.db_ref().tx()?,
118            Some(from_tx),
119            to_tx,
120        )
121    })??;
122
123    output_db.update(|tx| {
124        tx.import_table_with_range::<tables::TransactionSenders, _>(
125            &db_tool.provider_factory.db_ref().tx()?,
126            Some(from_tx),
127            to_tx,
128        )
129    })??;
130
131    Ok(())
132}
133
134/// Dry-run an unwind to FROM block, so we can get the `PlainStorageState` and
135/// `PlainAccountState` safely. There might be some state dependency from an address
136/// which hasn't been changed in the given range.
137fn unwind_and_copy<N: ProviderNodeTypes>(
138    db_tool: &DbTool<N>,
139    from: u64,
140    tip_block_number: u64,
141    output_db: &DatabaseEnv,
142) -> eyre::Result<()> {
143    let provider = db_tool.provider_factory.database_provider_rw()?;
144
145    let mut exec_stage = ExecutionStage::new_with_executor(
146        NoopBlockExecutorProvider::<N::Primitives>::default(),
147        NoopConsensus::arc(),
148    );
149
150    exec_stage.unwind(
151        &provider,
152        UnwindInput {
153            unwind_to: from,
154            checkpoint: StageCheckpoint::new(tip_block_number),
155            bad_block: None,
156        },
157    )?;
158
159    let unwind_inner_tx = provider.into_tx();
160
161    output_db
162        .update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(&unwind_inner_tx))??;
163    output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
164    output_db.update(|tx| tx.import_table::<tables::Bytecodes, _>(&unwind_inner_tx))??;
165
166    Ok(())
167}
168
169/// Try to re-execute the stage without committing
170fn dry_run<N, E, C>(
171    output_provider_factory: ProviderFactory<N>,
172    to: u64,
173    from: u64,
174    executor: E,
175    consensus: C,
176) -> eyre::Result<()>
177where
178    N: ProviderNodeTypes,
179    E: BlockExecutorProvider<Primitives = N::Primitives>,
180    C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
181{
182    info!(target: "reth::cli", "Executing stage. [dry-run]");
183
184    let mut exec_stage = ExecutionStage::new_with_executor(executor, Arc::new(consensus));
185
186    let input =
187        reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
188    exec_stage.execute(&output_provider_factory.database_provider_rw()?, input)?;
189
190    info!(target: "reth::cli", "Success");
191
192    Ok(())
193}