1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7 database::Database,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::version::version_metadata;
13use reth_node_metrics::{
14 chain::ChainSpecInfo,
15 hooks::Hooks,
16 server::{MetricServer, MetricServerConfig},
17 version::VersionInfo,
18};
19use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
20use reth_stages::StageId;
21use reth_tasks::TaskExecutor;
22use reth_trie::{
23 verify::{Output, Verifier},
24 Nibbles,
25};
26use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
27use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
28use std::{
29 net::SocketAddr,
30 time::{Duration, Instant},
31};
32use tracing::{info, warn};
33
34const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
35
36#[derive(Parser, Debug)]
38pub struct Command {
39 #[arg(long)]
41 pub(crate) dry_run: bool,
42
43 #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
47 pub(crate) metrics: Option<SocketAddr>,
48}
49
50impl Command {
51 pub fn execute<N: ProviderNodeTypes>(
53 self,
54 tool: &DbTool<N>,
55 task_executor: TaskExecutor,
56 ) -> eyre::Result<()> {
57 let _metrics_handle = if let Some(listen_addr) = self.metrics {
59 let chain_name = tool.provider_factory.chain_spec().chain().to_string();
60 let executor = task_executor.clone();
61
62 let handle = task_executor.spawn_critical("metrics server", async move {
63 let config = MetricServerConfig::new(
64 listen_addr,
65 VersionInfo {
66 version: version_metadata().cargo_pkg_version.as_ref(),
67 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
68 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
69 git_sha: version_metadata().vergen_git_sha.as_ref(),
70 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
71 build_profile: version_metadata().build_profile_name.as_ref(),
72 },
73 ChainSpecInfo { name: chain_name },
74 executor,
75 Hooks::builder().build(),
76 );
77
78 if let Err(e) = MetricServer::new(config).serve().await {
80 tracing::error!("Metrics server error: {}", e);
81 }
82 });
83
84 Some(handle)
85 } else {
86 None
87 };
88
89 if self.dry_run {
90 verify_only(tool)?
91 } else {
92 verify_and_repair(tool)?
93 }
94
95 Ok(())
96 }
97}
98
99fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
100 let finish_checkpoint = tool
102 .provider_factory
103 .provider()?
104 .get_stage_checkpoint(StageId::Finish)?
105 .unwrap_or_default();
106 info!("Database block tip: {}", finish_checkpoint.block_number);
107
108 let db = tool.provider_factory.db_ref();
110 let mut tx = db.tx()?;
111 tx.disable_long_read_transaction_safety();
112
113 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
115 let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
116 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
117
118 let metrics = RepairTrieMetrics::new();
119
120 let mut inconsistent_nodes = 0;
121 let start_time = Instant::now();
122 let mut last_progress_time = Instant::now();
123
124 for output_result in verifier {
126 let output = output_result?;
127
128 if let Output::Progress(path) = output {
129 if last_progress_time.elapsed() > PROGRESS_PERIOD {
130 output_progress(path, start_time, inconsistent_nodes);
131 last_progress_time = Instant::now();
132 }
133 } else {
134 warn!("Inconsistency found: {output:?}");
135 inconsistent_nodes += 1;
136
137 match output {
139 Output::AccountExtra(_, _) |
140 Output::AccountWrong { .. } |
141 Output::AccountMissing(_, _) => {
142 metrics.account_inconsistencies.increment(1);
143 }
144 Output::StorageExtra(_, _, _) |
145 Output::StorageWrong { .. } |
146 Output::StorageMissing(_, _, _) => {
147 metrics.storage_inconsistencies.increment(1);
148 }
149 Output::Progress(_) => unreachable!(),
150 }
151 }
152 }
153
154 info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
155
156 Ok(())
157}
158
159fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
161 let account_hashing_checkpoint =
162 provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
163 let storage_hashing_checkpoint =
164 provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
165 let merkle_checkpoint =
166 provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
167
168 if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
169 return Err(eyre::eyre!(
170 "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
171 merkle_checkpoint.block_number,
172 account_hashing_checkpoint.block_number,
173 ))
174 }
175
176 if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
177 return Err(eyre::eyre!(
178 "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
179 merkle_checkpoint.block_number,
180 storage_hashing_checkpoint.block_number,
181 ))
182 }
183
184 let merkle_checkpoint_progress =
185 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
186 if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
187 return Err(eyre::eyre!(
188 "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
189 ))
190 }
191
192 Ok(())
193}
194
195fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
196 let mut provider_rw = tool.provider_factory.provider_rw()?;
198
199 let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
201 info!("Database block tip: {}", finish_checkpoint.block_number);
202
203 verify_checkpoints(provider_rw.as_ref())?;
205
206 let tx = provider_rw.tx_mut();
208 tx.disable_long_read_transaction_safety();
209 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
210 let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
211
212 let tx = provider_rw.tx_ref();
215 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
216 let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
217
218 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
220
221 let metrics = RepairTrieMetrics::new();
222
223 let mut inconsistent_nodes = 0;
224 let start_time = Instant::now();
225 let mut last_progress_time = Instant::now();
226
227 for output_result in verifier {
229 let output = output_result?;
230
231 if !matches!(output, Output::Progress(_)) {
232 warn!("Inconsistency found, will repair: {output:?}");
233 inconsistent_nodes += 1;
234
235 match &output {
237 Output::AccountExtra(_, _) |
238 Output::AccountWrong { .. } |
239 Output::AccountMissing(_, _) => {
240 metrics.account_inconsistencies.increment(1);
241 }
242 Output::StorageExtra(_, _, _) |
243 Output::StorageWrong { .. } |
244 Output::StorageMissing(_, _, _) => {
245 metrics.storage_inconsistencies.increment(1);
246 }
247 Output::Progress(_) => {}
248 }
249 }
250
251 match output {
252 Output::AccountExtra(path, _node) => {
253 let nibbles = StoredNibbles(path);
255 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
256 account_trie_cursor.delete_current()?;
257 }
258 }
259 Output::StorageExtra(account, path, _node) => {
260 let nibbles = StoredNibblesSubKey(path);
262 if storage_trie_cursor
263 .seek_by_key_subkey(account, nibbles.clone())?
264 .filter(|e| e.nibbles == nibbles)
265 .is_some()
266 {
267 storage_trie_cursor.delete_current()?;
268 }
269 }
270 Output::AccountWrong { path, expected: node, .. } |
271 Output::AccountMissing(path, node) => {
272 let nibbles = StoredNibbles(path);
274 account_trie_cursor.upsert(nibbles, &node)?;
275 }
276 Output::StorageWrong { account, path, expected: node, .. } |
277 Output::StorageMissing(account, path, node) => {
278 let nibbles = StoredNibblesSubKey(path);
282 let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
283 if storage_trie_cursor
284 .seek_by_key_subkey(account, nibbles.clone())?
285 .filter(|v| v.nibbles == nibbles)
286 .is_some()
287 {
288 storage_trie_cursor.delete_current()?;
289 }
290 storage_trie_cursor.upsert(account, &entry)?;
291 }
292 Output::Progress(path) => {
293 if last_progress_time.elapsed() > PROGRESS_PERIOD {
294 output_progress(path, start_time, inconsistent_nodes);
295 last_progress_time = Instant::now();
296 }
297 }
298 }
299 }
300
301 if inconsistent_nodes == 0 {
302 info!("No inconsistencies found");
303 } else {
304 provider_rw.commit()?;
305 info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
306 }
307
308 Ok(())
309}
310
311fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
313 let mut current_value: u64 = 0;
318 let nibbles_to_use = last_account.len().min(16);
319
320 for i in 0..nibbles_to_use {
321 current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
322 }
323 if nibbles_to_use < 16 {
325 current_value <<= (16 - nibbles_to_use) * 4;
326 }
327
328 let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
329 let progress_percent_str = format!("{progress_percent:.2}");
330
331 let elapsed = start_time.elapsed();
333 let elapsed_secs = elapsed.as_secs_f64();
334
335 let estimated_total_time =
336 if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
337 let remaining_time = estimated_total_time - elapsed_secs;
338 let eta_duration = Duration::from_secs(remaining_time as u64);
339
340 info!(
341 progress_percent = progress_percent_str,
342 eta = %humantime::format_duration(eta_duration),
343 inconsistent_nodes,
344 "Repairing trie tables",
345 );
346}
347
348#[derive(Debug)]
350struct RepairTrieMetrics {
351 account_inconsistencies: Counter,
352 storage_inconsistencies: Counter,
353}
354
355impl RepairTrieMetrics {
356 fn new() -> Self {
357 Self {
358 account_inconsistencies: metrics::counter!(
359 "db.repair_trie.inconsistencies_found",
360 "type" => "account"
361 ),
362 storage_inconsistencies: metrics::counter!(
363 "db.repair_trie.inconsistencies_found",
364 "type" => "storage"
365 ),
366 }
367 }
368}