reth_cli_commands/db/
repair_trie.rs1use clap::Parser;
2use reth_db_api::{
3 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
4 database::Database,
5 tables,
6 transaction::{DbTx, DbTxMut},
7};
8use reth_node_builder::NodeTypesWithDB;
9use reth_provider::{providers::ProviderNodeTypes, ProviderFactory, StageCheckpointReader};
10use reth_stages::StageId;
11use reth_trie::{
12 verify::{Output, Verifier},
13 Nibbles,
14};
15use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
16use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
17use std::time::{Duration, Instant};
18use tracing::{info, warn};
19
20const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
21
22#[derive(Parser, Debug)]
24pub struct Command {
25 #[arg(long)]
27 pub(crate) dry_run: bool,
28}
29
30impl Command {
31 pub fn execute<N: ProviderNodeTypes>(
33 self,
34 provider_factory: ProviderFactory<N>,
35 ) -> eyre::Result<()> {
36 if self.dry_run {
37 verify_only(provider_factory)?
38 } else {
39 verify_and_repair(provider_factory)?
40 }
41
42 Ok(())
43 }
44}
45
46fn verify_only<N: NodeTypesWithDB>(provider_factory: ProviderFactory<N>) -> eyre::Result<()> {
47 let db = provider_factory.db_ref();
49 let mut tx = db.tx()?;
50 tx.disable_long_read_transaction_safety();
51
52 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
54 let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
55 let verifier = Verifier::new(trie_cursor_factory, hashed_cursor_factory)?;
56
57 let mut inconsistent_nodes = 0;
58 let start_time = Instant::now();
59 let mut last_progress_time = Instant::now();
60
61 for output_result in verifier {
63 let output = output_result?;
64
65 if let Output::Progress(path) = output {
66 if last_progress_time.elapsed() > PROGRESS_PERIOD {
67 output_progress(path, start_time, inconsistent_nodes);
68 last_progress_time = Instant::now();
69 }
70 } else {
71 warn!("Inconsistency found: {output:?}");
72 inconsistent_nodes += 1;
73 }
74 }
75
76 info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
77
78 Ok(())
79}
80
81fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
83 let account_hashing_checkpoint =
84 provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
85 let storage_hashing_checkpoint =
86 provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
87 let merkle_checkpoint =
88 provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
89
90 if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
91 return Err(eyre::eyre!(
92 "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
93 merkle_checkpoint.block_number,
94 account_hashing_checkpoint.block_number,
95 ))
96 }
97
98 if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
99 return Err(eyre::eyre!(
100 "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
101 merkle_checkpoint.block_number,
102 storage_hashing_checkpoint.block_number,
103 ))
104 }
105
106 let merkle_checkpoint_progress =
107 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
108 if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
109 return Err(eyre::eyre!(
110 "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
111 ))
112 }
113
114 Ok(())
115}
116
117fn verify_and_repair<N: ProviderNodeTypes>(
118 provider_factory: ProviderFactory<N>,
119) -> eyre::Result<()> {
120 let mut provider_rw = provider_factory.provider_rw()?;
122
123 verify_checkpoints(provider_rw.as_ref())?;
125
126 let tx = provider_rw.tx_mut();
128 tx.disable_long_read_transaction_safety();
129 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
130 let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
131
132 let tx = provider_rw.tx_ref();
135 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
136 let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
137
138 let verifier = Verifier::new(trie_cursor_factory, hashed_cursor_factory)?;
140
141 let mut inconsistent_nodes = 0;
142 let start_time = Instant::now();
143 let mut last_progress_time = Instant::now();
144
145 for output_result in verifier {
147 let output = output_result?;
148
149 if !matches!(output, Output::Progress(_)) {
150 warn!("Inconsistency found, will repair: {output:?}");
151 inconsistent_nodes += 1;
152 }
153
154 match output {
155 Output::AccountExtra(path, _node) => {
156 let nibbles = StoredNibbles(path);
158 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
159 account_trie_cursor.delete_current()?;
160 }
161 }
162 Output::StorageExtra(account, path, _node) => {
163 let nibbles = StoredNibblesSubKey(path);
165 if storage_trie_cursor
166 .seek_by_key_subkey(account, nibbles.clone())?
167 .filter(|e| e.nibbles == nibbles)
168 .is_some()
169 {
170 storage_trie_cursor.delete_current()?;
171 }
172 }
173 Output::AccountWrong { path, expected: node, .. } |
174 Output::AccountMissing(path, node) => {
175 let nibbles = StoredNibbles(path);
177 account_trie_cursor.upsert(nibbles, &node)?;
178 }
179 Output::StorageWrong { account, path, expected: node, .. } |
180 Output::StorageMissing(account, path, node) => {
181 let nibbles = StoredNibblesSubKey(path);
183 let entry = StorageTrieEntry { nibbles, node };
184 storage_trie_cursor.upsert(account, &entry)?;
185 }
186 Output::Progress(path) => {
187 if last_progress_time.elapsed() > PROGRESS_PERIOD {
188 output_progress(path, start_time, inconsistent_nodes);
189 last_progress_time = Instant::now();
190 }
191 }
192 }
193 }
194
195 if inconsistent_nodes == 0 {
196 info!("No inconsistencies found");
197 } else {
198 info!("Repaired {} inconsistencies, committing changes", inconsistent_nodes);
199 provider_rw.commit()?;
200 }
201
202 Ok(())
203}
204
205fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
207 let mut current_value: u64 = 0;
212 let nibbles_to_use = last_account.len().min(16);
213
214 for i in 0..nibbles_to_use {
215 current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
216 }
217 if nibbles_to_use < 16 {
219 current_value <<= (16 - nibbles_to_use) * 4;
220 }
221
222 let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
223 let progress_percent_str = format!("{progress_percent:.2}");
224
225 let elapsed = start_time.elapsed();
227 let elapsed_secs = elapsed.as_secs_f64();
228
229 let estimated_total_time =
230 if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
231 let remaining_time = estimated_total_time - elapsed_secs;
232 let eta_duration = Duration::from_secs(remaining_time as u64);
233
234 info!(
235 progress_percent = progress_percent_str,
236 eta = %humantime::format_duration(eta_duration),
237 inconsistent_nodes,
238 "Repairing trie tables",
239 );
240}