Skip to main content

reth_cli_commands/stage/dump/
execution.rs

1use super::setup;
2use reth_consensus::{noop::NoopConsensus, FullConsensus};
3use reth_db::DatabaseEnv;
4use reth_db_api::{
5    cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
6};
7use reth_db_common::DbTool;
8use reth_evm::ConfigureEvm;
9use reth_node_builder::NodeTypesWithDB;
10use reth_node_core::dirs::{ChainPath, DataDirPath};
11use reth_provider::{
12    providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
13    DatabaseProviderFactory, ProviderFactory,
14};
15use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
16use std::sync::Arc;
17use tracing::info;
18
19#[expect(clippy::too_many_arguments)]
20pub(crate) async fn dump_execution_stage<N, E, C>(
21    db_tool: &DbTool<N>,
22    from: u64,
23    to: u64,
24    output_datadir: ChainPath<DataDirPath>,
25    should_run: bool,
26    evm_config: E,
27    consensus: C,
28    runtime: reth_tasks::Runtime,
29) -> eyre::Result<()>
30where
31    N: ProviderNodeTypes<DB = DatabaseEnv>,
32    E: ConfigureEvm<Primitives = N::Primitives> + 'static,
33    C: FullConsensus<E::Primitives> + 'static,
34{
35    let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
36
37    import_tables_with_range(&output_db, db_tool, from, to)?;
38
39    unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
40
41    if should_run {
42        dry_run(
43            ProviderFactory::<N>::new(
44                output_db,
45                db_tool.chain(),
46                StaticFileProvider::read_write(output_datadir.static_files())?,
47                RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
48                runtime,
49            )?,
50            to,
51            from,
52            evm_config,
53            consensus,
54        )?;
55    }
56
57    Ok(())
58}
59
60/// Imports all the tables that can be copied over a range.
61fn import_tables_with_range<N: NodeTypesWithDB>(
62    output_db: &DatabaseEnv,
63    db_tool: &DbTool<N>,
64    from: u64,
65    to: u64,
66) -> eyre::Result<()> {
67    //  We're not sharing the transaction in case the memory grows too much.
68
69    output_db.update(|tx| {
70        tx.import_table_with_range::<tables::CanonicalHeaders, _>(
71            &db_tool.provider_factory.db_ref().tx()?,
72            Some(from),
73            to,
74        )
75    })??;
76    output_db.update(|tx| {
77        tx.import_table_with_range::<tables::Headers, _>(
78            &db_tool.provider_factory.db_ref().tx()?,
79            Some(from),
80            to,
81        )
82    })??;
83    output_db.update(|tx| {
84        tx.import_table_with_range::<tables::BlockBodyIndices, _>(
85            &db_tool.provider_factory.db_ref().tx()?,
86            Some(from),
87            to,
88        )
89    })??;
90    output_db.update(|tx| {
91        tx.import_table_with_range::<tables::BlockOmmers, _>(
92            &db_tool.provider_factory.db_ref().tx()?,
93            Some(from),
94            to,
95        )
96    })??;
97
98    // Find range of transactions that need to be copied over
99    let (from_tx, to_tx) = db_tool.provider_factory.db_ref().view(|read_tx| {
100        let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
101        let (_, from_block) =
102            read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
103        let (_, to_block) =
104            read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
105
106        Ok::<(u64, u64), eyre::ErrReport>((
107            from_block.first_tx_num,
108            to_block.first_tx_num + to_block.tx_count,
109        ))
110    })??;
111
112    output_db.update(|tx| {
113        tx.import_table_with_range::<tables::Transactions, _>(
114            &db_tool.provider_factory.db_ref().tx()?,
115            Some(from_tx),
116            to_tx,
117        )
118    })??;
119
120    output_db.update(|tx| {
121        tx.import_table_with_range::<tables::TransactionSenders, _>(
122            &db_tool.provider_factory.db_ref().tx()?,
123            Some(from_tx),
124            to_tx,
125        )
126    })??;
127
128    Ok(())
129}
130
131/// Dry-run an unwind to FROM block, so we can get the `PlainStorageState` and
132/// `PlainAccountState` safely. There might be some state dependency from an address
133/// which hasn't been changed in the given range.
134fn unwind_and_copy<N: ProviderNodeTypes>(
135    db_tool: &DbTool<N>,
136    from: u64,
137    tip_block_number: u64,
138    output_db: &DatabaseEnv,
139    evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
140) -> eyre::Result<()> {
141    let provider = db_tool.provider_factory.database_provider_rw()?;
142
143    let mut exec_stage = ExecutionStage::new_with_executor(evm_config, NoopConsensus::arc());
144
145    exec_stage.unwind(
146        &provider,
147        UnwindInput {
148            unwind_to: from,
149            checkpoint: StageCheckpoint::new(tip_block_number),
150            bad_block: None,
151        },
152    )?;
153
154    let unwind_inner_tx = provider.into_tx();
155
156    output_db
157        .update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(&unwind_inner_tx))??;
158    output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
159    output_db.update(|tx| tx.import_table::<tables::Bytecodes, _>(&unwind_inner_tx))??;
160
161    Ok(())
162}
163
164/// Try to re-execute the stage without committing
165fn dry_run<N, E, C>(
166    output_provider_factory: ProviderFactory<N>,
167    to: u64,
168    from: u64,
169    evm_config: E,
170    consensus: C,
171) -> eyre::Result<()>
172where
173    N: ProviderNodeTypes,
174    E: ConfigureEvm<Primitives = N::Primitives> + 'static,
175    C: FullConsensus<E::Primitives> + 'static,
176{
177    info!(target: "reth::cli", "Executing stage. [dry-run]");
178
179    let mut exec_stage = ExecutionStage::new_with_executor(evm_config, Arc::new(consensus));
180
181    let input =
182        reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
183    exec_stage.execute(&output_provider_factory.database_provider_rw()?, input)?;
184
185    info!(target: "reth::cli", "Success");
186
187    Ok(())
188}