reth_cli_commands/db/
repair_trie.rs

1use clap::Parser;
2use reth_db_api::{
3    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
4    database::Database,
5    tables,
6    transaction::{DbTx, DbTxMut},
7};
8use reth_node_builder::NodeTypesWithDB;
9use reth_provider::{providers::ProviderNodeTypes, ProviderFactory, StageCheckpointReader};
10use reth_stages::StageId;
11use reth_trie::{
12    verify::{Output, Verifier},
13    Nibbles,
14};
15use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
16use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
17use std::time::{Duration, Instant};
18use tracing::{info, warn};
19
20const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
21
22/// The arguments for the `reth db repair-trie` command
23#[derive(Parser, Debug)]
24pub struct Command {
25    /// Only show inconsistencies without making any repairs
26    #[arg(long)]
27    pub(crate) dry_run: bool,
28}
29
30impl Command {
31    /// Execute `db repair-trie` command
32    pub fn execute<N: ProviderNodeTypes>(
33        self,
34        provider_factory: ProviderFactory<N>,
35    ) -> eyre::Result<()> {
36        if self.dry_run {
37            verify_only(provider_factory)?
38        } else {
39            verify_and_repair(provider_factory)?
40        }
41
42        Ok(())
43    }
44}
45
46fn verify_only<N: NodeTypesWithDB>(provider_factory: ProviderFactory<N>) -> eyre::Result<()> {
47    // Get a database transaction directly from the database
48    let db = provider_factory.db_ref();
49    let mut tx = db.tx()?;
50    tx.disable_long_read_transaction_safety();
51
52    // Create the verifier
53    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
54    let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
55    let verifier = Verifier::new(trie_cursor_factory, hashed_cursor_factory)?;
56
57    let mut inconsistent_nodes = 0;
58    let start_time = Instant::now();
59    let mut last_progress_time = Instant::now();
60
61    // Iterate over the verifier and repair inconsistencies
62    for output_result in verifier {
63        let output = output_result?;
64
65        if let Output::Progress(path) = output {
66            if last_progress_time.elapsed() > PROGRESS_PERIOD {
67                output_progress(path, start_time, inconsistent_nodes);
68                last_progress_time = Instant::now();
69            }
70        } else {
71            warn!("Inconsistency found: {output:?}");
72            inconsistent_nodes += 1;
73        }
74    }
75
76    info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
77
78    Ok(())
79}
80
81/// Checks that the merkle stage has completed running up to the account and storage hashing stages.
82fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
83    let account_hashing_checkpoint =
84        provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
85    let storage_hashing_checkpoint =
86        provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
87    let merkle_checkpoint =
88        provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
89
90    if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
91        return Err(eyre::eyre!(
92            "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
93            merkle_checkpoint.block_number,
94            account_hashing_checkpoint.block_number,
95        ))
96    }
97
98    if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
99        return Err(eyre::eyre!(
100            "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
101            merkle_checkpoint.block_number,
102            storage_hashing_checkpoint.block_number,
103        ))
104    }
105
106    let merkle_checkpoint_progress =
107        provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
108    if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
109        return Err(eyre::eyre!(
110            "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
111        ))
112    }
113
114    Ok(())
115}
116
117fn verify_and_repair<N: ProviderNodeTypes>(
118    provider_factory: ProviderFactory<N>,
119) -> eyre::Result<()> {
120    // Get a read-write database provider
121    let mut provider_rw = provider_factory.provider_rw()?;
122
123    // Check that a pipeline sync isn't in progress.
124    verify_checkpoints(provider_rw.as_ref())?;
125
126    // Create cursors for making modifications with
127    let tx = provider_rw.tx_mut();
128    tx.disable_long_read_transaction_safety();
129    let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
130    let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
131
132    // Create the cursor factories. These cannot accept the `&mut` tx above because they require it
133    // to be AsRef.
134    let tx = provider_rw.tx_ref();
135    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
136    let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
137
138    // Create the verifier
139    let verifier = Verifier::new(trie_cursor_factory, hashed_cursor_factory)?;
140
141    let mut inconsistent_nodes = 0;
142    let start_time = Instant::now();
143    let mut last_progress_time = Instant::now();
144
145    // Iterate over the verifier and repair inconsistencies
146    for output_result in verifier {
147        let output = output_result?;
148
149        if !matches!(output, Output::Progress(_)) {
150            warn!("Inconsistency found, will repair: {output:?}");
151            inconsistent_nodes += 1;
152        }
153
154        match output {
155            Output::AccountExtra(path, _node) => {
156                // Extra account node in trie, remove it
157                let nibbles = StoredNibbles(path);
158                if account_trie_cursor.seek_exact(nibbles)?.is_some() {
159                    account_trie_cursor.delete_current()?;
160                }
161            }
162            Output::StorageExtra(account, path, _node) => {
163                // Extra storage node in trie, remove it
164                let nibbles = StoredNibblesSubKey(path);
165                if storage_trie_cursor
166                    .seek_by_key_subkey(account, nibbles.clone())?
167                    .filter(|e| e.nibbles == nibbles)
168                    .is_some()
169                {
170                    storage_trie_cursor.delete_current()?;
171                }
172            }
173            Output::AccountWrong { path, expected: node, .. } |
174            Output::AccountMissing(path, node) => {
175                // Wrong/missing account node value, upsert it
176                let nibbles = StoredNibbles(path);
177                account_trie_cursor.upsert(nibbles, &node)?;
178            }
179            Output::StorageWrong { account, path, expected: node, .. } |
180            Output::StorageMissing(account, path, node) => {
181                // Wrong/missing storage node value, upsert it
182                let nibbles = StoredNibblesSubKey(path);
183                let entry = StorageTrieEntry { nibbles, node };
184                storage_trie_cursor.upsert(account, &entry)?;
185            }
186            Output::Progress(path) => {
187                if last_progress_time.elapsed() > PROGRESS_PERIOD {
188                    output_progress(path, start_time, inconsistent_nodes);
189                    last_progress_time = Instant::now();
190                }
191            }
192        }
193    }
194
195    if inconsistent_nodes == 0 {
196        info!("No inconsistencies found");
197    } else {
198        info!("Repaired {} inconsistencies, committing changes", inconsistent_nodes);
199        provider_rw.commit()?;
200    }
201
202    Ok(())
203}
204
205/// Output progress information based on the last seen account path.
206fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
207    // Calculate percentage based on position in the trie path space
208    // For progress estimation, we'll use the first few nibbles as an approximation
209
210    // Convert the first 16 nibbles (8 bytes) to a u64 for progress calculation
211    let mut current_value: u64 = 0;
212    let nibbles_to_use = last_account.len().min(16);
213
214    for i in 0..nibbles_to_use {
215        current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
216    }
217    // Shift left to fill remaining bits if we have fewer than 16 nibbles
218    if nibbles_to_use < 16 {
219        current_value <<= (16 - nibbles_to_use) * 4;
220    }
221
222    let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
223    let progress_percent_str = format!("{progress_percent:.2}");
224
225    // Calculate ETA based on current speed
226    let elapsed = start_time.elapsed();
227    let elapsed_secs = elapsed.as_secs_f64();
228
229    let estimated_total_time =
230        if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
231    let remaining_time = estimated_total_time - elapsed_secs;
232    let eta_duration = Duration::from_secs(remaining_time as u64);
233
234    info!(
235        progress_percent = progress_percent_str,
236        eta = %humantime::format_duration(eta_duration),
237        inconsistent_nodes,
238        "Repairing trie tables",
239    );
240}