1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7 database::Database,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::version::version_metadata;
13use reth_node_metrics::{
14 chain::ChainSpecInfo,
15 hooks::Hooks,
16 server::{MetricServer, MetricServerConfig},
17 version::VersionInfo,
18};
19use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
20use reth_stages::StageId;
21use reth_trie::{
22 verify::{Output, Verifier},
23 Nibbles,
24};
25use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
26use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
27use std::{
28 net::SocketAddr,
29 time::{Duration, Instant},
30};
31use tracing::{info, warn};
32
33const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
34
35#[derive(Parser, Debug)]
37pub struct Command {
38 #[arg(long)]
40 pub(crate) dry_run: bool,
41
42 #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
46 pub(crate) metrics: Option<SocketAddr>,
47}
48
49impl Command {
50 pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
52 let _metrics_handle = if let Some(listen_addr) = self.metrics {
54 let chain_name = tool.provider_factory.chain_spec().chain().to_string();
56
57 let handle = std::thread::Builder::new().name("metrics-server".to_string()).spawn(
58 move || {
59 let runtime = tokio::runtime::Builder::new_current_thread()
61 .enable_all()
62 .build()
63 .expect("Failed to create tokio runtime for metrics server");
64
65 let handle = runtime.handle().clone();
66 runtime.block_on(async move {
67 let task_manager = reth_tasks::TaskManager::new(handle.clone());
68 let task_executor = task_manager.executor();
69
70 let config = MetricServerConfig::new(
71 listen_addr,
72 VersionInfo {
73 version: version_metadata().cargo_pkg_version.as_ref(),
74 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
75 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
76 git_sha: version_metadata().vergen_git_sha.as_ref(),
77 target_triple: version_metadata()
78 .vergen_cargo_target_triple
79 .as_ref(),
80 build_profile: version_metadata().build_profile_name.as_ref(),
81 },
82 ChainSpecInfo { name: chain_name },
83 task_executor,
84 Hooks::builder().build(),
85 );
86
87 if let Err(e) = MetricServer::new(config).serve().await {
89 tracing::error!("Metrics server error: {}", e);
90 }
91
92 std::future::pending::<()>().await
94 });
95 },
96 )?;
97
98 Some(handle)
99 } else {
100 None
101 };
102
103 if self.dry_run {
104 verify_only(tool)?
105 } else {
106 verify_and_repair(tool)?
107 }
108
109 Ok(())
110 }
111}
112
113fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
114 let finish_checkpoint = tool
116 .provider_factory
117 .provider()?
118 .get_stage_checkpoint(StageId::Finish)?
119 .unwrap_or_default();
120 info!("Database block tip: {}", finish_checkpoint.block_number);
121
122 let db = tool.provider_factory.db_ref();
124 let mut tx = db.tx()?;
125 tx.disable_long_read_transaction_safety();
126
127 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
129 let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
130 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
131
132 let metrics = RepairTrieMetrics::new();
133
134 let mut inconsistent_nodes = 0;
135 let start_time = Instant::now();
136 let mut last_progress_time = Instant::now();
137
138 for output_result in verifier {
140 let output = output_result?;
141
142 if let Output::Progress(path) = output {
143 if last_progress_time.elapsed() > PROGRESS_PERIOD {
144 output_progress(path, start_time, inconsistent_nodes);
145 last_progress_time = Instant::now();
146 }
147 } else {
148 warn!("Inconsistency found: {output:?}");
149 inconsistent_nodes += 1;
150
151 match output {
153 Output::AccountExtra(_, _) |
154 Output::AccountWrong { .. } |
155 Output::AccountMissing(_, _) => {
156 metrics.account_inconsistencies.increment(1);
157 }
158 Output::StorageExtra(_, _, _) |
159 Output::StorageWrong { .. } |
160 Output::StorageMissing(_, _, _) => {
161 metrics.storage_inconsistencies.increment(1);
162 }
163 Output::Progress(_) => unreachable!(),
164 }
165 }
166 }
167
168 info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
169
170 Ok(())
171}
172
173fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
175 let account_hashing_checkpoint =
176 provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
177 let storage_hashing_checkpoint =
178 provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
179 let merkle_checkpoint =
180 provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
181
182 if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
183 return Err(eyre::eyre!(
184 "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
185 merkle_checkpoint.block_number,
186 account_hashing_checkpoint.block_number,
187 ))
188 }
189
190 if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
191 return Err(eyre::eyre!(
192 "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
193 merkle_checkpoint.block_number,
194 storage_hashing_checkpoint.block_number,
195 ))
196 }
197
198 let merkle_checkpoint_progress =
199 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
200 if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
201 return Err(eyre::eyre!(
202 "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
203 ))
204 }
205
206 Ok(())
207}
208
209fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
210 let mut provider_rw = tool.provider_factory.provider_rw()?;
212
213 let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
215 info!("Database block tip: {}", finish_checkpoint.block_number);
216
217 verify_checkpoints(provider_rw.as_ref())?;
219
220 let tx = provider_rw.tx_mut();
222 tx.disable_long_read_transaction_safety();
223 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
224 let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
225
226 let tx = provider_rw.tx_ref();
229 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
230 let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
231
232 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
234
235 let metrics = RepairTrieMetrics::new();
236
237 let mut inconsistent_nodes = 0;
238 let start_time = Instant::now();
239 let mut last_progress_time = Instant::now();
240
241 for output_result in verifier {
243 let output = output_result?;
244
245 if !matches!(output, Output::Progress(_)) {
246 warn!("Inconsistency found, will repair: {output:?}");
247 inconsistent_nodes += 1;
248
249 match &output {
251 Output::AccountExtra(_, _) |
252 Output::AccountWrong { .. } |
253 Output::AccountMissing(_, _) => {
254 metrics.account_inconsistencies.increment(1);
255 }
256 Output::StorageExtra(_, _, _) |
257 Output::StorageWrong { .. } |
258 Output::StorageMissing(_, _, _) => {
259 metrics.storage_inconsistencies.increment(1);
260 }
261 Output::Progress(_) => {}
262 }
263 }
264
265 match output {
266 Output::AccountExtra(path, _node) => {
267 let nibbles = StoredNibbles(path);
269 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
270 account_trie_cursor.delete_current()?;
271 }
272 }
273 Output::StorageExtra(account, path, _node) => {
274 let nibbles = StoredNibblesSubKey(path);
276 if storage_trie_cursor
277 .seek_by_key_subkey(account, nibbles.clone())?
278 .filter(|e| e.nibbles == nibbles)
279 .is_some()
280 {
281 storage_trie_cursor.delete_current()?;
282 }
283 }
284 Output::AccountWrong { path, expected: node, .. } |
285 Output::AccountMissing(path, node) => {
286 let nibbles = StoredNibbles(path);
288 account_trie_cursor.upsert(nibbles, &node)?;
289 }
290 Output::StorageWrong { account, path, expected: node, .. } |
291 Output::StorageMissing(account, path, node) => {
292 let nibbles = StoredNibblesSubKey(path);
296 let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
297 if storage_trie_cursor
298 .seek_by_key_subkey(account, nibbles.clone())?
299 .filter(|v| v.nibbles == nibbles)
300 .is_some()
301 {
302 storage_trie_cursor.delete_current()?;
303 }
304 storage_trie_cursor.upsert(account, &entry)?;
305 }
306 Output::Progress(path) => {
307 if last_progress_time.elapsed() > PROGRESS_PERIOD {
308 output_progress(path, start_time, inconsistent_nodes);
309 last_progress_time = Instant::now();
310 }
311 }
312 }
313 }
314
315 if inconsistent_nodes == 0 {
316 info!("No inconsistencies found");
317 } else {
318 info!("Repaired {} inconsistencies, committing changes", inconsistent_nodes);
319 provider_rw.commit()?;
320 }
321
322 Ok(())
323}
324
325fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
327 let mut current_value: u64 = 0;
332 let nibbles_to_use = last_account.len().min(16);
333
334 for i in 0..nibbles_to_use {
335 current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
336 }
337 if nibbles_to_use < 16 {
339 current_value <<= (16 - nibbles_to_use) * 4;
340 }
341
342 let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
343 let progress_percent_str = format!("{progress_percent:.2}");
344
345 let elapsed = start_time.elapsed();
347 let elapsed_secs = elapsed.as_secs_f64();
348
349 let estimated_total_time =
350 if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
351 let remaining_time = estimated_total_time - elapsed_secs;
352 let eta_duration = Duration::from_secs(remaining_time as u64);
353
354 info!(
355 progress_percent = progress_percent_str,
356 eta = %humantime::format_duration(eta_duration),
357 inconsistent_nodes,
358 "Repairing trie tables",
359 );
360}
361
362#[derive(Debug)]
364struct RepairTrieMetrics {
365 account_inconsistencies: Counter,
366 storage_inconsistencies: Counter,
367}
368
369impl RepairTrieMetrics {
370 fn new() -> Self {
371 Self {
372 account_inconsistencies: metrics::counter!(
373 "db.repair_trie.inconsistencies_found",
374 "type" => "account"
375 ),
376 storage_inconsistencies: metrics::counter!(
377 "db.repair_trie.inconsistencies_found",
378 "type" => "storage"
379 ),
380 }
381 }
382}