Skip to main content

reth_cli_commands/stage/dump/
execution.rs

1use super::setup;
2use reth_consensus::{noop::NoopConsensus, FullConsensus};
3use reth_db::DatabaseEnv;
4use reth_db_api::{
5    cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
6};
7use reth_db_common::DbTool;
8use reth_evm::ConfigureEvm;
9use reth_node_builder::NodeTypesWithDB;
10use reth_node_core::dirs::{ChainPath, DataDirPath};
11use reth_provider::{
12    providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
13    DatabaseProviderFactory, ProviderFactory,
14};
15use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
16use std::sync::Arc;
17use tracing::info;
18
19pub(crate) async fn dump_execution_stage<N, E, C>(
20    db_tool: &DbTool<N>,
21    from: u64,
22    to: u64,
23    output_datadir: ChainPath<DataDirPath>,
24    should_run: bool,
25    evm_config: E,
26    consensus: C,
27) -> eyre::Result<()>
28where
29    N: ProviderNodeTypes<DB = DatabaseEnv>,
30    E: ConfigureEvm<Primitives = N::Primitives> + 'static,
31    C: FullConsensus<E::Primitives> + 'static,
32{
33    let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
34
35    import_tables_with_range(&output_db, db_tool, from, to)?;
36
37    unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
38
39    if should_run {
40        let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
41        dry_run(
42            ProviderFactory::<N>::new(
43                output_db,
44                db_tool.chain(),
45                StaticFileProvider::read_write(output_datadir.static_files())?,
46                RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
47                runtime,
48            )?,
49            to,
50            from,
51            evm_config,
52            consensus,
53        )?;
54    }
55
56    Ok(())
57}
58
59/// Imports all the tables that can be copied over a range.
60fn import_tables_with_range<N: NodeTypesWithDB>(
61    output_db: &DatabaseEnv,
62    db_tool: &DbTool<N>,
63    from: u64,
64    to: u64,
65) -> eyre::Result<()> {
66    //  We're not sharing the transaction in case the memory grows too much.
67
68    output_db.update(|tx| {
69        tx.import_table_with_range::<tables::CanonicalHeaders, _>(
70            &db_tool.provider_factory.db_ref().tx()?,
71            Some(from),
72            to,
73        )
74    })??;
75    output_db.update(|tx| {
76        tx.import_table_with_range::<tables::Headers, _>(
77            &db_tool.provider_factory.db_ref().tx()?,
78            Some(from),
79            to,
80        )
81    })??;
82    output_db.update(|tx| {
83        tx.import_table_with_range::<tables::BlockBodyIndices, _>(
84            &db_tool.provider_factory.db_ref().tx()?,
85            Some(from),
86            to,
87        )
88    })??;
89    output_db.update(|tx| {
90        tx.import_table_with_range::<tables::BlockOmmers, _>(
91            &db_tool.provider_factory.db_ref().tx()?,
92            Some(from),
93            to,
94        )
95    })??;
96
97    // Find range of transactions that need to be copied over
98    let (from_tx, to_tx) = db_tool.provider_factory.db_ref().view(|read_tx| {
99        let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
100        let (_, from_block) =
101            read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
102        let (_, to_block) =
103            read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
104
105        Ok::<(u64, u64), eyre::ErrReport>((
106            from_block.first_tx_num,
107            to_block.first_tx_num + to_block.tx_count,
108        ))
109    })??;
110
111    output_db.update(|tx| {
112        tx.import_table_with_range::<tables::Transactions, _>(
113            &db_tool.provider_factory.db_ref().tx()?,
114            Some(from_tx),
115            to_tx,
116        )
117    })??;
118
119    output_db.update(|tx| {
120        tx.import_table_with_range::<tables::TransactionSenders, _>(
121            &db_tool.provider_factory.db_ref().tx()?,
122            Some(from_tx),
123            to_tx,
124        )
125    })??;
126
127    Ok(())
128}
129
130/// Dry-run an unwind to FROM block, so we can get the `PlainStorageState` and
131/// `PlainAccountState` safely. There might be some state dependency from an address
132/// which hasn't been changed in the given range.
133fn unwind_and_copy<N: ProviderNodeTypes>(
134    db_tool: &DbTool<N>,
135    from: u64,
136    tip_block_number: u64,
137    output_db: &DatabaseEnv,
138    evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
139) -> eyre::Result<()> {
140    let provider = db_tool.provider_factory.database_provider_rw()?;
141
142    let mut exec_stage = ExecutionStage::new_with_executor(evm_config, NoopConsensus::arc());
143
144    exec_stage.unwind(
145        &provider,
146        UnwindInput {
147            unwind_to: from,
148            checkpoint: StageCheckpoint::new(tip_block_number),
149            bad_block: None,
150        },
151    )?;
152
153    let unwind_inner_tx = provider.into_tx();
154
155    output_db
156        .update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(&unwind_inner_tx))??;
157    output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
158    output_db.update(|tx| tx.import_table::<tables::Bytecodes, _>(&unwind_inner_tx))??;
159
160    Ok(())
161}
162
163/// Try to re-execute the stage without committing
164fn dry_run<N, E, C>(
165    output_provider_factory: ProviderFactory<N>,
166    to: u64,
167    from: u64,
168    evm_config: E,
169    consensus: C,
170) -> eyre::Result<()>
171where
172    N: ProviderNodeTypes,
173    E: ConfigureEvm<Primitives = N::Primitives> + 'static,
174    C: FullConsensus<E::Primitives> + 'static,
175{
176    info!(target: "reth::cli", "Executing stage. [dry-run]");
177
178    let mut exec_stage = ExecutionStage::new_with_executor(evm_config, Arc::new(consensus));
179
180    let input =
181        reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
182    exec_stage.execute(&output_provider_factory.database_provider_rw()?, input)?;
183
184    info!(target: "reth::cli", "Success");
185
186    Ok(())
187}