1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7 database::Database,
8 transaction::{DbTx, DbTxMut},
9};
10use reth_db_common::DbTool;
11use reth_node_core::{
12 dirs::{ChainPath, DataDirPath},
13 version::version_metadata,
14};
15use reth_node_metrics::{
16 chain::ChainSpecInfo,
17 hooks::Hooks,
18 server::{MetricServer, MetricServerConfig},
19 version::VersionInfo,
20};
21use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
22use reth_stages::StageId;
23use reth_storage_api::StorageSettingsCache;
24use reth_tasks::TaskExecutor;
25use reth_trie::{
26 verify::{Output, Verifier},
27 Nibbles,
28};
29use reth_trie_db::{
30 DatabaseHashedCursorFactory, DatabaseTrieCursorFactory, StorageTrieEntryLike, TrieTableAdapter,
31};
32use std::{
33 net::SocketAddr,
34 time::{Duration, Instant},
35};
36use tracing::{info, warn};
37
38const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
39
40#[derive(Parser, Debug)]
42pub struct Command {
43 #[arg(long)]
45 pub(crate) dry_run: bool,
46
47 #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
51 pub(crate) metrics: Option<SocketAddr>,
52}
53
54impl Command {
55 pub fn execute<N: ProviderNodeTypes>(
57 self,
58 tool: &DbTool<N>,
59 task_executor: TaskExecutor,
60 data_dir: &ChainPath<DataDirPath>,
61 ) -> eyre::Result<()> {
62 let _metrics_handle = if let Some(listen_addr) = self.metrics {
64 let chain_name = tool.provider_factory.chain_spec().chain().to_string();
65 let executor = task_executor.clone();
66 let pprof_dump_dir = data_dir.pprof_dumps();
67
68 let handle = task_executor.spawn_critical_task("metrics server", async move {
69 let config = MetricServerConfig::new(
70 listen_addr,
71 VersionInfo {
72 version: version_metadata().cargo_pkg_version.as_ref(),
73 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
74 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
75 git_sha: version_metadata().vergen_git_sha.as_ref(),
76 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
77 build_profile: version_metadata().build_profile_name.as_ref(),
78 },
79 ChainSpecInfo { name: chain_name },
80 executor,
81 Hooks::builder().build(),
82 pprof_dump_dir,
83 );
84
85 if let Err(e) = MetricServer::new(config).serve().await {
87 tracing::error!("Metrics server error: {}", e);
88 }
89 });
90
91 Some(handle)
92 } else {
93 None
94 };
95
96 if self.dry_run {
97 verify_only(tool)?
98 } else {
99 verify_and_repair(tool)?
100 }
101
102 Ok(())
103 }
104}
105
106fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
107 let finish_checkpoint = tool
109 .provider_factory
110 .provider()?
111 .get_stage_checkpoint(StageId::Finish)?
112 .unwrap_or_default();
113 info!("Database block tip: {}", finish_checkpoint.block_number);
114
115 let db = tool.provider_factory.db_ref();
117 let mut tx = db.tx()?;
118 tx.disable_long_read_transaction_safety();
119
120 reth_trie_db::with_adapter!(tool.provider_factory, |A| do_verify_only::<_, A>(&tx))
121}
122
123fn do_verify_only<TX: DbTx, A: TrieTableAdapter>(tx: &TX) -> eyre::Result<()> {
124 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
126 let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
127 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
128
129 let metrics = RepairTrieMetrics::new();
130
131 let mut inconsistent_nodes = 0;
132 let start_time = Instant::now();
133 let mut last_progress_time = Instant::now();
134
135 for output_result in verifier {
137 let output = output_result?;
138
139 if let Output::Progress(path) = output {
140 if last_progress_time.elapsed() > PROGRESS_PERIOD {
141 output_progress(path, start_time, inconsistent_nodes);
142 last_progress_time = Instant::now();
143 }
144 } else {
145 warn!("Inconsistency found: {output:?}");
146 inconsistent_nodes += 1;
147
148 match output {
150 Output::AccountExtra(_, _) |
151 Output::AccountWrong { .. } |
152 Output::AccountMissing(_, _) => {
153 metrics.account_inconsistencies.increment(1);
154 }
155 Output::StorageExtra(_, _, _) |
156 Output::StorageWrong { .. } |
157 Output::StorageMissing(_, _, _) => {
158 metrics.storage_inconsistencies.increment(1);
159 }
160 Output::Progress(_) => unreachable!(),
161 }
162 }
163 }
164
165 info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
166
167 Ok(())
168}
169
170fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
172 let account_hashing_checkpoint =
173 provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
174 let storage_hashing_checkpoint =
175 provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
176 let merkle_checkpoint =
177 provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
178
179 if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
180 return Err(eyre::eyre!(
181 "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
182 merkle_checkpoint.block_number,
183 account_hashing_checkpoint.block_number,
184 ))
185 }
186
187 if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
188 return Err(eyre::eyre!(
189 "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
190 merkle_checkpoint.block_number,
191 storage_hashing_checkpoint.block_number,
192 ))
193 }
194
195 let merkle_checkpoint_progress =
196 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
197 if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
198 return Err(eyre::eyre!(
199 "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
200 ))
201 }
202
203 Ok(())
204}
205
206fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
207 let mut provider_rw = tool.provider_factory.provider_rw()?;
209
210 let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
212 info!("Database block tip: {}", finish_checkpoint.block_number);
213
214 verify_checkpoints(provider_rw.as_ref())?;
216
217 let inconsistent_nodes = reth_trie_db::with_adapter!(tool.provider_factory, |A| {
218 do_verify_and_repair::<_, A>(&mut provider_rw)?
219 });
220
221 if inconsistent_nodes == 0 {
222 info!("No inconsistencies found");
223 } else {
224 provider_rw.commit()?;
225 info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
226 }
227
228 Ok(())
229}
230
231fn do_verify_and_repair<N: ProviderNodeTypes, A: TrieTableAdapter>(
232 provider_rw: &mut reth_provider::DatabaseProviderRW<N::DB, N>,
233) -> eyre::Result<usize>
234where
235 <N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
236{
237 let tx = provider_rw.tx_mut();
239 tx.disable_long_read_transaction_safety();
240 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
241 let mut storage_trie_cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
242
243 let tx = provider_rw.tx_ref();
246 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
247 let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
248
249 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
251
252 let metrics = RepairTrieMetrics::new();
253
254 let mut inconsistent_nodes = 0;
255 let start_time = Instant::now();
256 let mut last_progress_time = Instant::now();
257
258 for output_result in verifier {
260 let output = output_result?;
261
262 if !matches!(output, Output::Progress(_)) {
263 warn!("Inconsistency found, will repair: {output:?}");
264 inconsistent_nodes += 1;
265
266 match &output {
268 Output::AccountExtra(_, _) |
269 Output::AccountWrong { .. } |
270 Output::AccountMissing(_, _) => {
271 metrics.account_inconsistencies.increment(1);
272 }
273 Output::StorageExtra(_, _, _) |
274 Output::StorageWrong { .. } |
275 Output::StorageMissing(_, _, _) => {
276 metrics.storage_inconsistencies.increment(1);
277 }
278 Output::Progress(_) => {}
279 }
280 }
281
282 match output {
283 Output::AccountExtra(path, _node) => {
284 let key: A::AccountKey = path.into();
286 if account_trie_cursor.seek_exact(key)?.is_some() {
287 account_trie_cursor.delete_current()?;
288 }
289 }
290 Output::StorageExtra(account, path, _node) => {
291 let subkey: A::StorageSubKey = path.into();
293 if storage_trie_cursor
294 .seek_by_key_subkey(account, subkey.clone())?
295 .filter(|e| *e.nibbles() == subkey)
296 .is_some()
297 {
298 storage_trie_cursor.delete_current()?;
299 }
300 }
301 Output::AccountWrong { path, expected: node, .. } |
302 Output::AccountMissing(path, node) => {
303 let key: A::AccountKey = path.into();
305 account_trie_cursor.upsert(key, &node)?;
306 }
307 Output::StorageWrong { account, path, expected: node, .. } |
308 Output::StorageMissing(account, path, node) => {
309 let subkey: A::StorageSubKey = path.into();
313 let entry = A::StorageValue::new(subkey.clone(), node);
314 if storage_trie_cursor
315 .seek_by_key_subkey(account, subkey.clone())?
316 .filter(|v| *v.nibbles() == subkey)
317 .is_some()
318 {
319 storage_trie_cursor.delete_current()?;
320 }
321 storage_trie_cursor.upsert(account, &entry)?;
322 }
323 Output::Progress(path) => {
324 if last_progress_time.elapsed() > PROGRESS_PERIOD {
325 output_progress(path, start_time, inconsistent_nodes);
326 last_progress_time = Instant::now();
327 }
328 }
329 }
330 }
331
332 Ok(inconsistent_nodes as usize)
333}
334
335fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
337 let mut current_value: u64 = 0;
342 let nibbles_to_use = last_account.len().min(16);
343
344 for i in 0..nibbles_to_use {
345 current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
346 }
347 if nibbles_to_use < 16 {
349 current_value <<= (16 - nibbles_to_use) * 4;
350 }
351
352 let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
353 let progress_percent_str = format!("{progress_percent:.2}");
354
355 let elapsed = start_time.elapsed();
357 let elapsed_secs = elapsed.as_secs_f64();
358
359 let estimated_total_time =
360 if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
361 let remaining_time = estimated_total_time - elapsed_secs;
362 let eta_duration = Duration::from_secs(remaining_time as u64);
363
364 info!(
365 progress_percent = progress_percent_str,
366 eta = %humantime::format_duration(eta_duration),
367 inconsistent_nodes,
368 "Repairing trie tables",
369 );
370}
371
372#[derive(Debug)]
374struct RepairTrieMetrics {
375 account_inconsistencies: Counter,
376 storage_inconsistencies: Counter,
377}
378
379impl RepairTrieMetrics {
380 fn new() -> Self {
381 Self {
382 account_inconsistencies: metrics::counter!(
383 "db.repair_trie.inconsistencies_found",
384 "type" => "account"
385 ),
386 storage_inconsistencies: metrics::counter!(
387 "db.repair_trie.inconsistencies_found",
388 "type" => "storage"
389 ),
390 }
391 }
392}