reth_cli_commands/stage/
drop.rs

1//! Database debugging tool
2use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
3use clap::Parser;
4use reth_chainspec::EthChainSpec;
5use reth_cli::chainspec::ChainSpecParser;
6use reth_db::{mdbx::tx::Tx, DatabaseError};
7use reth_db_api::{
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::{
12    init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
13    DbTool,
14};
15use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
16use reth_node_core::args::StageEnum;
17use reth_provider::{
18    DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter, TrieWriter,
19};
20use reth_prune::PruneSegment;
21use reth_stages::StageId;
22use reth_static_file_types::StaticFileSegment;
23use std::sync::Arc;
24
25/// `reth drop-stage` command
26#[derive(Debug, Parser)]
27pub struct Command<C: ChainSpecParser> {
28    #[command(flatten)]
29    env: EnvironmentArgs<C>,
30
31    stage: StageEnum,
32}
33
34impl<C: ChainSpecParser> Command<C> {
35    /// Execute `db` command
36    pub async fn execute<N: CliNodeTypes>(self) -> eyre::Result<()>
37    where
38        C: ChainSpecParser<ChainSpec = N::ChainSpec>,
39    {
40        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
41
42        let tool = DbTool::new(provider_factory)?;
43
44        let static_file_segment = match self.stage {
45            StageEnum::Headers => Some(StaticFileSegment::Headers),
46            StageEnum::Bodies => Some(StaticFileSegment::Transactions),
47            StageEnum::Execution => Some(StaticFileSegment::Receipts),
48            StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
49            _ => None,
50        };
51
52        // Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
53        // when `StaticFileProviderRW::commit` is called. We need to do that instead of
54        // deleting the jar files, otherwise if the task were to be interrupted after we
55        // have deleted them, BUT before we have committed the checkpoints to the database, we'd
56        // lose essential data.
57        if let Some(static_file_segment) = static_file_segment {
58            let static_file_provider = tool.provider_factory.static_file_provider();
59            if let Some(highest_block) =
60                static_file_provider.get_highest_static_file_block(static_file_segment)
61            {
62                let mut writer = static_file_provider.latest_writer(static_file_segment)?;
63
64                match static_file_segment {
65                    StaticFileSegment::Headers => {
66                        // Prune all headers leaving genesis intact.
67                        writer.prune_headers(highest_block)?;
68                    }
69                    StaticFileSegment::Transactions => {
70                        let to_delete = static_file_provider
71                            .get_highest_static_file_tx(static_file_segment)
72                            .map(|tx_num| tx_num + 1)
73                            .unwrap_or_default();
74                        writer.prune_transactions(to_delete, 0)?;
75                    }
76                    StaticFileSegment::Receipts => {
77                        let to_delete = static_file_provider
78                            .get_highest_static_file_tx(static_file_segment)
79                            .map(|tx_num| tx_num + 1)
80                            .unwrap_or_default();
81                        writer.prune_receipts(to_delete, 0)?;
82                    }
83                    StaticFileSegment::TransactionSenders => {
84                        let to_delete = static_file_provider
85                            .get_highest_static_file_tx(static_file_segment)
86                            .map(|tx_num| tx_num + 1)
87                            .unwrap_or_default();
88                        writer.prune_transaction_senders(to_delete, 0)?;
89                    }
90                }
91            }
92        }
93
94        let provider_rw = tool.provider_factory.database_provider_rw()?;
95        let tx = provider_rw.tx_ref();
96
97        match self.stage {
98            StageEnum::Headers => {
99                tx.clear::<tables::CanonicalHeaders>()?;
100                tx.clear::<tables::Headers<HeaderTy<N>>>()?;
101                tx.clear::<tables::HeaderNumbers>()?;
102                reset_stage_checkpoint(tx, StageId::Headers)?;
103
104                insert_genesis_header(&provider_rw, &self.env.chain)?;
105            }
106            StageEnum::Bodies => {
107                tx.clear::<tables::BlockBodyIndices>()?;
108                tx.clear::<tables::Transactions<TxTy<N>>>()?;
109
110                tx.clear::<tables::TransactionBlocks>()?;
111                tx.clear::<tables::BlockOmmers<HeaderTy<N>>>()?;
112                tx.clear::<tables::BlockWithdrawals>()?;
113                reset_stage_checkpoint(tx, StageId::Bodies)?;
114
115                insert_genesis_header(&provider_rw, &self.env.chain)?;
116            }
117            StageEnum::Senders => {
118                tx.clear::<tables::TransactionSenders>()?;
119                // Reset pruned numbers to not count them in the next rerun's stage progress
120                reset_prune_checkpoint(tx, PruneSegment::SenderRecovery)?;
121                reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
122            }
123            StageEnum::Execution => {
124                tx.clear::<tables::PlainAccountState>()?;
125                tx.clear::<tables::PlainStorageState>()?;
126                tx.clear::<tables::AccountChangeSets>()?;
127                tx.clear::<tables::StorageChangeSets>()?;
128                tx.clear::<tables::Bytecodes>()?;
129                tx.clear::<tables::Receipts<ReceiptTy<N>>>()?;
130
131                reset_prune_checkpoint(tx, PruneSegment::Receipts)?;
132                reset_prune_checkpoint(tx, PruneSegment::ContractLogs)?;
133                reset_stage_checkpoint(tx, StageId::Execution)?;
134
135                let alloc = &self.env.chain.genesis().alloc;
136                insert_genesis_state(&provider_rw, alloc.iter())?;
137            }
138            StageEnum::AccountHashing => {
139                tx.clear::<tables::HashedAccounts>()?;
140                reset_stage_checkpoint(tx, StageId::AccountHashing)?;
141            }
142            StageEnum::StorageHashing => {
143                tx.clear::<tables::HashedStorages>()?;
144                reset_stage_checkpoint(tx, StageId::StorageHashing)?;
145            }
146            StageEnum::Hashing => {
147                // Clear hashed accounts
148                tx.clear::<tables::HashedAccounts>()?;
149                reset_stage_checkpoint(tx, StageId::AccountHashing)?;
150
151                // Clear hashed storages
152                tx.clear::<tables::HashedStorages>()?;
153                reset_stage_checkpoint(tx, StageId::StorageHashing)?;
154            }
155            StageEnum::Merkle => {
156                tx.clear::<tables::AccountsTrie>()?;
157                tx.clear::<tables::StoragesTrie>()?;
158
159                reset_stage_checkpoint(tx, StageId::MerkleExecute)?;
160                reset_stage_checkpoint(tx, StageId::MerkleUnwind)?;
161
162                tx.delete::<tables::StageCheckpointProgresses>(
163                    StageId::MerkleExecute.to_string(),
164                    None,
165                )?;
166            }
167            StageEnum::MerkleChangeSets => {
168                provider_rw.clear_trie_changesets()?;
169                reset_stage_checkpoint(tx, StageId::MerkleChangeSets)?;
170            }
171            StageEnum::AccountHistory | StageEnum::StorageHistory => {
172                tx.clear::<tables::AccountsHistory>()?;
173                tx.clear::<tables::StoragesHistory>()?;
174
175                reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
176                reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
177
178                insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
179            }
180            StageEnum::TxLookup => {
181                tx.clear::<tables::TransactionHashNumbers>()?;
182                reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
183
184                reset_stage_checkpoint(tx, StageId::TransactionLookup)?;
185                insert_genesis_header(&provider_rw, &self.env.chain)?;
186            }
187        }
188
189        tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;
190
191        provider_rw.commit()?;
192
193        Ok(())
194    }
195    /// Returns the underlying chain being used to run this command
196    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
197        Some(&self.env.chain)
198    }
199}
200
201fn reset_prune_checkpoint(
202    tx: &Tx<reth_db::mdbx::RW>,
203    prune_segment: PruneSegment,
204) -> Result<(), DatabaseError> {
205    if let Some(mut prune_checkpoint) = tx.get::<tables::PruneCheckpoints>(prune_segment)? {
206        prune_checkpoint.block_number = None;
207        prune_checkpoint.tx_number = None;
208        tx.put::<tables::PruneCheckpoints>(prune_segment, prune_checkpoint)?;
209    }
210
211    Ok(())
212}
213
214fn reset_stage_checkpoint(
215    tx: &Tx<reth_db::mdbx::RW>,
216    stage_id: StageId,
217) -> Result<(), DatabaseError> {
218    tx.put::<tables::StageCheckpoints>(stage_id.to_string(), Default::default())?;
219
220    Ok(())
221}