Skip to main content

reth_cli_commands/stage/
drop.rs

1//! Database debugging tool
2use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
3use clap::Parser;
4use reth_chainspec::EthChainSpec;
5use reth_cli::chainspec::ChainSpecParser;
6use reth_db::{mdbx::tx::Tx, DatabaseError};
7use reth_db_api::{
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::{
12    init::{
13        insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
14        insert_genesis_storage_history,
15    },
16    DbTool,
17};
18use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
19use reth_node_core::args::StageEnum;
20use reth_provider::{
21    DBProvider, RocksDBProviderFactory, StaticFileProviderFactory, StaticFileWriter,
22    StorageSettingsCache,
23};
24use reth_prune::PruneSegment;
25use reth_stages::StageId;
26use reth_static_file_types::StaticFileSegment;
27use std::sync::Arc;
28
29/// `reth drop-stage` command
30#[derive(Debug, Parser)]
31pub struct Command<C: ChainSpecParser> {
32    #[command(flatten)]
33    env: EnvironmentArgs<C>,
34
35    stage: StageEnum,
36}
37
38impl<C: ChainSpecParser> Command<C> {
39    /// Execute `db` command
40    pub async fn execute<N: CliNodeTypes>(self) -> eyre::Result<()>
41    where
42        C: ChainSpecParser<ChainSpec = N::ChainSpec>,
43    {
44        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
45
46        let tool = DbTool::new(provider_factory)?;
47
48        let static_file_segments = match self.stage {
49            StageEnum::Headers => vec![StaticFileSegment::Headers],
50            StageEnum::Bodies => vec![StaticFileSegment::Transactions],
51            StageEnum::Execution => vec![
52                StaticFileSegment::Receipts,
53                StaticFileSegment::AccountChangeSets,
54                StaticFileSegment::StorageChangeSets,
55            ],
56            StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
57            _ => vec![],
58        };
59
60        // Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
61        // when `StaticFileProviderRW::commit` is called. We need to do that instead of
62        // deleting the jar files, otherwise if the task were to be interrupted after we
63        // have deleted them, BUT before we have committed the checkpoints to the database, we'd
64        // lose essential data.
65        let static_file_provider = tool.provider_factory.static_file_provider();
66        for segment in static_file_segments {
67            if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
68            {
69                let mut writer = static_file_provider.latest_writer(segment)?;
70
71                match segment {
72                    StaticFileSegment::Headers => {
73                        writer.prune_headers(highest_block)?;
74                    }
75                    StaticFileSegment::Transactions => {
76                        let to_delete = static_file_provider
77                            .get_highest_static_file_tx(segment)
78                            .map(|tx_num| tx_num + 1)
79                            .unwrap_or_default();
80                        writer.prune_transactions(to_delete, 0)?;
81                    }
82                    StaticFileSegment::Receipts => {
83                        let to_delete = static_file_provider
84                            .get_highest_static_file_tx(segment)
85                            .map(|tx_num| tx_num + 1)
86                            .unwrap_or_default();
87                        writer.prune_receipts(to_delete, 0)?;
88                    }
89                    StaticFileSegment::TransactionSenders => {
90                        let to_delete = static_file_provider
91                            .get_highest_static_file_tx(segment)
92                            .map(|tx_num| tx_num + 1)
93                            .unwrap_or_default();
94                        writer.prune_transaction_senders(to_delete, 0)?;
95                    }
96                    StaticFileSegment::AccountChangeSets => {
97                        writer.prune_account_changesets(highest_block)?;
98                    }
99                    StaticFileSegment::StorageChangeSets => {
100                        writer.prune_storage_changesets(highest_block)?;
101                    }
102                }
103            }
104        }
105
106        let provider_rw = tool.provider_factory.unwind_provider_rw()?;
107        let tx = provider_rw.tx_ref();
108
109        match self.stage {
110            StageEnum::Headers => {
111                tx.clear::<tables::CanonicalHeaders>()?;
112                tx.clear::<tables::Headers<HeaderTy<N>>>()?;
113                tx.clear::<tables::HeaderNumbers>()?;
114                reset_stage_checkpoint(tx, StageId::Headers)?;
115
116                insert_genesis_header(&provider_rw, &self.env.chain)?;
117            }
118            StageEnum::Bodies => {
119                tx.clear::<tables::BlockBodyIndices>()?;
120                tx.clear::<tables::Transactions<TxTy<N>>>()?;
121
122                tx.clear::<tables::TransactionBlocks>()?;
123                tx.clear::<tables::BlockOmmers<HeaderTy<N>>>()?;
124                tx.clear::<tables::BlockWithdrawals>()?;
125                reset_stage_checkpoint(tx, StageId::Bodies)?;
126
127                insert_genesis_header(&provider_rw, &self.env.chain)?;
128            }
129            StageEnum::Senders => {
130                tx.clear::<tables::TransactionSenders>()?;
131                // Reset pruned numbers to not count them in the next rerun's stage progress
132                reset_prune_checkpoint(tx, PruneSegment::SenderRecovery)?;
133                reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
134            }
135            StageEnum::Execution => {
136                if provider_rw.cached_storage_settings().use_hashed_state() {
137                    tx.clear::<tables::HashedAccounts>()?;
138                    tx.clear::<tables::HashedStorages>()?;
139                    reset_stage_checkpoint(tx, StageId::AccountHashing)?;
140                    reset_stage_checkpoint(tx, StageId::StorageHashing)?;
141                } else {
142                    tx.clear::<tables::PlainAccountState>()?;
143                    tx.clear::<tables::PlainStorageState>()?;
144                }
145                tx.clear::<tables::AccountChangeSets>()?;
146                tx.clear::<tables::StorageChangeSets>()?;
147                tx.clear::<tables::Bytecodes>()?;
148                tx.clear::<tables::Receipts<ReceiptTy<N>>>()?;
149
150                reset_prune_checkpoint(tx, PruneSegment::Receipts)?;
151                reset_prune_checkpoint(tx, PruneSegment::ContractLogs)?;
152                reset_stage_checkpoint(tx, StageId::Execution)?;
153
154                let alloc = &self.env.chain.genesis().alloc;
155                insert_genesis_state(&provider_rw, alloc.iter())?;
156            }
157            StageEnum::AccountHashing => {
158                tx.clear::<tables::HashedAccounts>()?;
159                reset_stage_checkpoint(tx, StageId::AccountHashing)?;
160            }
161            StageEnum::StorageHashing => {
162                tx.clear::<tables::HashedStorages>()?;
163                reset_stage_checkpoint(tx, StageId::StorageHashing)?;
164            }
165            StageEnum::Hashing => {
166                // Clear hashed accounts
167                tx.clear::<tables::HashedAccounts>()?;
168                reset_stage_checkpoint(tx, StageId::AccountHashing)?;
169
170                // Clear hashed storages
171                tx.clear::<tables::HashedStorages>()?;
172                reset_stage_checkpoint(tx, StageId::StorageHashing)?;
173            }
174            StageEnum::Merkle => {
175                tx.clear::<tables::AccountsTrie>()?;
176                tx.clear::<tables::StoragesTrie>()?;
177
178                reset_stage_checkpoint(tx, StageId::MerkleExecute)?;
179                reset_stage_checkpoint(tx, StageId::MerkleUnwind)?;
180
181                tx.delete::<tables::StageCheckpointProgresses>(
182                    StageId::MerkleExecute.to_string(),
183                    None,
184                )?;
185            }
186            StageEnum::AccountHistory => {
187                let settings = provider_rw.cached_storage_settings();
188                let rocksdb = tool.provider_factory.rocksdb_provider();
189
190                if settings.storage_v2 {
191                    rocksdb.clear::<tables::AccountsHistory>()?;
192                } else {
193                    tx.clear::<tables::AccountsHistory>()?;
194                }
195
196                reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
197
198                insert_genesis_account_history(
199                    &provider_rw,
200                    self.env.chain.genesis().alloc.iter(),
201                )?;
202            }
203            StageEnum::StorageHistory => {
204                let settings = provider_rw.cached_storage_settings();
205                let rocksdb = tool.provider_factory.rocksdb_provider();
206
207                if settings.storage_v2 {
208                    rocksdb.clear::<tables::StoragesHistory>()?;
209                } else {
210                    tx.clear::<tables::StoragesHistory>()?;
211                }
212
213                reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
214
215                insert_genesis_storage_history(
216                    &provider_rw,
217                    self.env.chain.genesis().alloc.iter(),
218                )?;
219            }
220            StageEnum::TxLookup => {
221                if provider_rw.cached_storage_settings().storage_v2 {
222                    tool.provider_factory
223                        .rocksdb_provider()
224                        .clear::<tables::TransactionHashNumbers>()?;
225                } else {
226                    tx.clear::<tables::TransactionHashNumbers>()?;
227                }
228
229                reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
230
231                reset_stage_checkpoint(tx, StageId::TransactionLookup)?;
232                insert_genesis_header(&provider_rw, &self.env.chain)?;
233            }
234        }
235
236        tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;
237
238        provider_rw.commit()?;
239
240        Ok(())
241    }
242    /// Returns the underlying chain being used to run this command
243    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
244        Some(&self.env.chain)
245    }
246}
247
248fn reset_prune_checkpoint(
249    tx: &Tx<reth_db::mdbx::RW>,
250    prune_segment: PruneSegment,
251) -> Result<(), DatabaseError> {
252    if let Some(mut prune_checkpoint) = tx.get::<tables::PruneCheckpoints>(prune_segment)? {
253        prune_checkpoint.block_number = None;
254        prune_checkpoint.tx_number = None;
255        tx.put::<tables::PruneCheckpoints>(prune_segment, prune_checkpoint)?;
256    }
257
258    Ok(())
259}
260
261fn reset_stage_checkpoint(
262    tx: &Tx<reth_db::mdbx::RW>,
263    stage_id: StageId,
264) -> Result<(), DatabaseError> {
265    tx.put::<tables::StageCheckpoints>(stage_id.to_string(), Default::default())?;
266
267    Ok(())
268}