1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7 database::Database,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::{
13 dirs::{ChainPath, DataDirPath},
14 version::version_metadata,
15};
16use reth_node_metrics::{
17 chain::ChainSpecInfo,
18 hooks::Hooks,
19 server::{MetricServer, MetricServerConfig},
20 version::VersionInfo,
21};
22use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
23use reth_stages::StageId;
24use reth_tasks::TaskExecutor;
25use reth_trie::{
26 verify::{Output, Verifier},
27 Nibbles,
28};
29use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
30use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
31use std::{
32 net::SocketAddr,
33 time::{Duration, Instant},
34};
35use tracing::{info, warn};
36
37const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
38
39#[derive(Parser, Debug)]
41pub struct Command {
42 #[arg(long)]
44 pub(crate) dry_run: bool,
45
46 #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
50 pub(crate) metrics: Option<SocketAddr>,
51}
52
53impl Command {
54 pub fn execute<N: ProviderNodeTypes>(
56 self,
57 tool: &DbTool<N>,
58 task_executor: TaskExecutor,
59 data_dir: &ChainPath<DataDirPath>,
60 ) -> eyre::Result<()> {
61 let _metrics_handle = if let Some(listen_addr) = self.metrics {
63 let chain_name = tool.provider_factory.chain_spec().chain().to_string();
64 let executor = task_executor.clone();
65 let pprof_dump_dir = data_dir.pprof_dumps();
66
67 let handle = task_executor.spawn_critical_task("metrics server", async move {
68 let config = MetricServerConfig::new(
69 listen_addr,
70 VersionInfo {
71 version: version_metadata().cargo_pkg_version.as_ref(),
72 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
73 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
74 git_sha: version_metadata().vergen_git_sha.as_ref(),
75 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
76 build_profile: version_metadata().build_profile_name.as_ref(),
77 },
78 ChainSpecInfo { name: chain_name },
79 executor,
80 Hooks::builder().build(),
81 pprof_dump_dir,
82 );
83
84 if let Err(e) = MetricServer::new(config).serve().await {
86 tracing::error!("Metrics server error: {}", e);
87 }
88 });
89
90 Some(handle)
91 } else {
92 None
93 };
94
95 if self.dry_run {
96 verify_only(tool)?
97 } else {
98 verify_and_repair(tool)?
99 }
100
101 Ok(())
102 }
103}
104
105fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
106 let finish_checkpoint = tool
108 .provider_factory
109 .provider()?
110 .get_stage_checkpoint(StageId::Finish)?
111 .unwrap_or_default();
112 info!("Database block tip: {}", finish_checkpoint.block_number);
113
114 let db = tool.provider_factory.db_ref();
116 let mut tx = db.tx()?;
117 tx.disable_long_read_transaction_safety();
118
119 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
121 let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
122 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
123
124 let metrics = RepairTrieMetrics::new();
125
126 let mut inconsistent_nodes = 0;
127 let start_time = Instant::now();
128 let mut last_progress_time = Instant::now();
129
130 for output_result in verifier {
132 let output = output_result?;
133
134 if let Output::Progress(path) = output {
135 if last_progress_time.elapsed() > PROGRESS_PERIOD {
136 output_progress(path, start_time, inconsistent_nodes);
137 last_progress_time = Instant::now();
138 }
139 } else {
140 warn!("Inconsistency found: {output:?}");
141 inconsistent_nodes += 1;
142
143 match output {
145 Output::AccountExtra(_, _) |
146 Output::AccountWrong { .. } |
147 Output::AccountMissing(_, _) => {
148 metrics.account_inconsistencies.increment(1);
149 }
150 Output::StorageExtra(_, _, _) |
151 Output::StorageWrong { .. } |
152 Output::StorageMissing(_, _, _) => {
153 metrics.storage_inconsistencies.increment(1);
154 }
155 Output::Progress(_) => unreachable!(),
156 }
157 }
158 }
159
160 info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
161
162 Ok(())
163}
164
165fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
167 let account_hashing_checkpoint =
168 provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
169 let storage_hashing_checkpoint =
170 provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
171 let merkle_checkpoint =
172 provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
173
174 if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
175 return Err(eyre::eyre!(
176 "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
177 merkle_checkpoint.block_number,
178 account_hashing_checkpoint.block_number,
179 ))
180 }
181
182 if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
183 return Err(eyre::eyre!(
184 "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
185 merkle_checkpoint.block_number,
186 storage_hashing_checkpoint.block_number,
187 ))
188 }
189
190 let merkle_checkpoint_progress =
191 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
192 if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
193 return Err(eyre::eyre!(
194 "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
195 ))
196 }
197
198 Ok(())
199}
200
201fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
202 let mut provider_rw = tool.provider_factory.provider_rw()?;
204
205 let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
207 info!("Database block tip: {}", finish_checkpoint.block_number);
208
209 verify_checkpoints(provider_rw.as_ref())?;
211
212 let tx = provider_rw.tx_mut();
214 tx.disable_long_read_transaction_safety();
215 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
216 let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
217
218 let tx = provider_rw.tx_ref();
221 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
222 let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
223
224 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
226
227 let metrics = RepairTrieMetrics::new();
228
229 let mut inconsistent_nodes = 0;
230 let start_time = Instant::now();
231 let mut last_progress_time = Instant::now();
232
233 for output_result in verifier {
235 let output = output_result?;
236
237 if !matches!(output, Output::Progress(_)) {
238 warn!("Inconsistency found, will repair: {output:?}");
239 inconsistent_nodes += 1;
240
241 match &output {
243 Output::AccountExtra(_, _) |
244 Output::AccountWrong { .. } |
245 Output::AccountMissing(_, _) => {
246 metrics.account_inconsistencies.increment(1);
247 }
248 Output::StorageExtra(_, _, _) |
249 Output::StorageWrong { .. } |
250 Output::StorageMissing(_, _, _) => {
251 metrics.storage_inconsistencies.increment(1);
252 }
253 Output::Progress(_) => {}
254 }
255 }
256
257 match output {
258 Output::AccountExtra(path, _node) => {
259 let nibbles = StoredNibbles(path);
261 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
262 account_trie_cursor.delete_current()?;
263 }
264 }
265 Output::StorageExtra(account, path, _node) => {
266 let nibbles = StoredNibblesSubKey(path);
268 if storage_trie_cursor
269 .seek_by_key_subkey(account, nibbles.clone())?
270 .filter(|e| e.nibbles == nibbles)
271 .is_some()
272 {
273 storage_trie_cursor.delete_current()?;
274 }
275 }
276 Output::AccountWrong { path, expected: node, .. } |
277 Output::AccountMissing(path, node) => {
278 let nibbles = StoredNibbles(path);
280 account_trie_cursor.upsert(nibbles, &node)?;
281 }
282 Output::StorageWrong { account, path, expected: node, .. } |
283 Output::StorageMissing(account, path, node) => {
284 let nibbles = StoredNibblesSubKey(path);
288 let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
289 if storage_trie_cursor
290 .seek_by_key_subkey(account, nibbles.clone())?
291 .filter(|v| v.nibbles == nibbles)
292 .is_some()
293 {
294 storage_trie_cursor.delete_current()?;
295 }
296 storage_trie_cursor.upsert(account, &entry)?;
297 }
298 Output::Progress(path) => {
299 if last_progress_time.elapsed() > PROGRESS_PERIOD {
300 output_progress(path, start_time, inconsistent_nodes);
301 last_progress_time = Instant::now();
302 }
303 }
304 }
305 }
306
307 if inconsistent_nodes == 0 {
308 info!("No inconsistencies found");
309 } else {
310 provider_rw.commit()?;
311 info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
312 }
313
314 Ok(())
315}
316
317fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
319 let mut current_value: u64 = 0;
324 let nibbles_to_use = last_account.len().min(16);
325
326 for i in 0..nibbles_to_use {
327 current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
328 }
329 if nibbles_to_use < 16 {
331 current_value <<= (16 - nibbles_to_use) * 4;
332 }
333
334 let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
335 let progress_percent_str = format!("{progress_percent:.2}");
336
337 let elapsed = start_time.elapsed();
339 let elapsed_secs = elapsed.as_secs_f64();
340
341 let estimated_total_time =
342 if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
343 let remaining_time = estimated_total_time - elapsed_secs;
344 let eta_duration = Duration::from_secs(remaining_time as u64);
345
346 info!(
347 progress_percent = progress_percent_str,
348 eta = %humantime::format_duration(eta_duration),
349 inconsistent_nodes,
350 "Repairing trie tables",
351 );
352}
353
354#[derive(Debug)]
356struct RepairTrieMetrics {
357 account_inconsistencies: Counter,
358 storage_inconsistencies: Counter,
359}
360
361impl RepairTrieMetrics {
362 fn new() -> Self {
363 Self {
364 account_inconsistencies: metrics::counter!(
365 "db.repair_trie.inconsistencies_found",
366 "type" => "account"
367 ),
368 storage_inconsistencies: metrics::counter!(
369 "db.repair_trie.inconsistencies_found",
370 "type" => "storage"
371 ),
372 }
373 }
374}