1use alloy_consensus::BlockHeader as AlloyBlockHeader;
2use clap::Parser;
3use metrics::{self, Counter};
4use reth_chainspec::EthChainSpec;
5use reth_cli_util::parse_socket_address;
6use reth_db_api::{
7 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
8 database::Database,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::{
13 dirs::{ChainPath, DataDirPath},
14 version::version_metadata,
15};
16use reth_node_metrics::{
17 chain::ChainSpecInfo,
18 hooks::Hooks,
19 server::{MetricServer, MetricServerConfig},
20 version::VersionInfo,
21};
22use reth_provider::{
23 providers::ProviderNodeTypes, ChainSpecProvider, HeaderProvider, StageCheckpointReader,
24};
25use reth_stages::StageId;
26use reth_storage_api::StorageSettingsCache;
27use reth_tasks::TaskExecutor;
28use reth_trie::{
29 verify::{Output, Verifier},
30 Nibbles,
31};
32use reth_trie_db::{
33 DatabaseHashedCursorFactory, DatabaseStateRoot, DatabaseTrieCursorFactory,
34 StorageTrieEntryLike, TrieTableAdapter,
35};
36use std::{
37 net::SocketAddr,
38 time::{Duration, Instant},
39};
40use tracing::{info, warn};
41
42const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
43
44#[derive(Parser, Debug)]
46pub struct Command {
47 #[arg(long)]
49 pub(crate) dry_run: bool,
50
51 #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
55 pub(crate) metrics: Option<SocketAddr>,
56}
57
58impl Command {
59 pub fn execute<N: ProviderNodeTypes>(
61 self,
62 tool: &DbTool<N>,
63 task_executor: TaskExecutor,
64 data_dir: &ChainPath<DataDirPath>,
65 ) -> eyre::Result<()> {
66 let _metrics_handle = if let Some(listen_addr) = self.metrics {
68 let chain_name = tool.provider_factory.chain_spec().chain().to_string();
69 let executor = task_executor.clone();
70 let pprof_dump_dir = data_dir.pprof_dumps();
71
72 let handle = task_executor.spawn_critical_task("metrics server", async move {
73 let config = MetricServerConfig::new(
74 listen_addr,
75 VersionInfo {
76 version: version_metadata().cargo_pkg_version.as_ref(),
77 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
78 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
79 git_sha: version_metadata().vergen_git_sha.as_ref(),
80 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
81 build_profile: version_metadata().build_profile_name.as_ref(),
82 },
83 ChainSpecInfo { name: chain_name },
84 executor,
85 Hooks::builder().build(),
86 pprof_dump_dir,
87 );
88
89 if let Err(e) = MetricServer::new(config).serve().await {
91 tracing::error!("Metrics server error: {}", e);
92 }
93 });
94
95 Some(handle)
96 } else {
97 None
98 };
99
100 if self.dry_run {
101 verify_only(tool)?
102 } else {
103 verify_and_repair(tool)?
104 }
105
106 Ok(())
107 }
108}
109
110fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
111 let finish_checkpoint = tool
113 .provider_factory
114 .provider()?
115 .get_stage_checkpoint(StageId::Finish)?
116 .unwrap_or_default();
117 info!("Database block tip: {}", finish_checkpoint.block_number);
118
119 let db = tool.provider_factory.db_ref();
121 let mut tx = db.tx()?;
122 tx.disable_long_read_transaction_safety();
123
124 reth_trie_db::with_adapter!(tool.provider_factory, |A| do_verify_only::<_, A>(&tx))
125}
126
127fn do_verify_only<TX: DbTx, A: TrieTableAdapter>(tx: &TX) -> eyre::Result<()> {
128 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
130 let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
131 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
132
133 let metrics = RepairTrieMetrics::new();
134
135 let mut inconsistent_nodes = 0;
136 let start_time = Instant::now();
137 let mut last_progress_time = Instant::now();
138
139 for output_result in verifier {
141 let output = output_result?;
142
143 if let Output::Progress(path) = output {
144 if last_progress_time.elapsed() > PROGRESS_PERIOD {
145 output_progress(path, start_time, inconsistent_nodes);
146 last_progress_time = Instant::now();
147 }
148 } else {
149 warn!("Inconsistency found: {output:?}");
150 inconsistent_nodes += 1;
151
152 match output {
154 Output::AccountExtra(_, _) |
155 Output::AccountWrong { .. } |
156 Output::AccountMissing(_, _) => {
157 metrics.account_inconsistencies.increment(1);
158 }
159 Output::StorageExtra(_, _, _) |
160 Output::StorageWrong { .. } |
161 Output::StorageMissing(_, _, _) => {
162 metrics.storage_inconsistencies.increment(1);
163 }
164 Output::Progress(_) => unreachable!(),
165 }
166 }
167 }
168
169 info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
170
171 Ok(())
172}
173
174fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
176 let account_hashing_checkpoint =
177 provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
178 let storage_hashing_checkpoint =
179 provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
180 let merkle_checkpoint =
181 provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
182
183 if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
184 return Err(eyre::eyre!(
185 "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
186 merkle_checkpoint.block_number,
187 account_hashing_checkpoint.block_number,
188 ))
189 }
190
191 if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
192 return Err(eyre::eyre!(
193 "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
194 merkle_checkpoint.block_number,
195 storage_hashing_checkpoint.block_number,
196 ))
197 }
198
199 let merkle_checkpoint_progress =
200 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
201 if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
202 return Err(eyre::eyre!(
203 "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
204 ))
205 }
206
207 Ok(())
208}
209
210fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
211 let mut provider_rw = tool.provider_factory.provider_rw()?;
213
214 let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
216 info!("Database block tip: {}", finish_checkpoint.block_number);
217
218 verify_checkpoints(provider_rw.as_ref())?;
220
221 let inconsistent_nodes = reth_trie_db::with_adapter!(tool.provider_factory, |A| {
222 do_verify_and_repair::<_, A>(&mut provider_rw, finish_checkpoint.block_number)?
223 });
224
225 if inconsistent_nodes == 0 {
226 info!("No inconsistencies found");
227 } else {
228 provider_rw.commit()?;
229 info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
230 }
231
232 Ok(())
233}
234
235fn do_verify_and_repair<N: ProviderNodeTypes, A: TrieTableAdapter>(
236 provider_rw: &mut reth_provider::DatabaseProviderRW<N::DB, N>,
237 block_number: u64,
238) -> eyre::Result<usize>
239where
240 <N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
241{
242 let tx = provider_rw.tx_mut();
244 tx.disable_long_read_transaction_safety();
245 let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
246 let mut storage_trie_cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
247
248 let tx = provider_rw.tx_ref();
251 let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
252 let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
253
254 let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
256
257 let metrics = RepairTrieMetrics::new();
258
259 let mut inconsistent_nodes = 0;
260 let start_time = Instant::now();
261 let mut last_progress_time = Instant::now();
262
263 for output_result in verifier {
265 let output = output_result?;
266
267 if !matches!(output, Output::Progress(_)) {
268 warn!("Inconsistency found, will repair: {output:?}");
269 inconsistent_nodes += 1;
270
271 match &output {
273 Output::AccountExtra(_, _) |
274 Output::AccountWrong { .. } |
275 Output::AccountMissing(_, _) => {
276 metrics.account_inconsistencies.increment(1);
277 }
278 Output::StorageExtra(_, _, _) |
279 Output::StorageWrong { .. } |
280 Output::StorageMissing(_, _, _) => {
281 metrics.storage_inconsistencies.increment(1);
282 }
283 Output::Progress(_) => {}
284 }
285 }
286
287 match output {
288 Output::AccountExtra(path, _node) => {
289 let key: A::AccountKey = path.into();
291 if account_trie_cursor.seek_exact(key)?.is_some() {
292 account_trie_cursor.delete_current()?;
293 }
294 }
295 Output::StorageExtra(account, path, _node) => {
296 let subkey: A::StorageSubKey = path.into();
298 if storage_trie_cursor
299 .seek_by_key_subkey(account, subkey.clone())?
300 .filter(|e| *e.nibbles() == subkey)
301 .is_some()
302 {
303 storage_trie_cursor.delete_current()?;
304 }
305 }
306 Output::AccountWrong { path, expected: node, .. } |
307 Output::AccountMissing(path, node) => {
308 let key: A::AccountKey = path.into();
310 account_trie_cursor.upsert(key, &node)?;
311 }
312 Output::StorageWrong { account, path, expected: node, .. } |
313 Output::StorageMissing(account, path, node) => {
314 let subkey: A::StorageSubKey = path.into();
318 let entry = A::StorageValue::new(subkey.clone(), node);
319 if storage_trie_cursor
320 .seek_by_key_subkey(account, subkey.clone())?
321 .filter(|v| *v.nibbles() == subkey)
322 .is_some()
323 {
324 storage_trie_cursor.delete_current()?;
325 }
326 storage_trie_cursor.upsert(account, &entry)?;
327 }
328 Output::Progress(path) => {
329 if last_progress_time.elapsed() > PROGRESS_PERIOD {
330 output_progress(path, start_time, inconsistent_nodes);
331 last_progress_time = Instant::now();
332 }
333 }
334 }
335 }
336
337 drop(account_trie_cursor);
338 drop(storage_trie_cursor);
339
340 if inconsistent_nodes > 0 {
341 verify_repaired_state_root::<_, A>(provider_rw, block_number)?;
344 }
345
346 Ok(inconsistent_nodes as usize)
347}
348
349fn verify_repaired_state_root<N: ProviderNodeTypes, A: TrieTableAdapter>(
350 provider_rw: &reth_provider::DatabaseProviderRW<N::DB, N>,
351 block_number: u64,
352) -> eyre::Result<()>
353where
354 <N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
355{
356 type DbStateRoot<'a, TX, A> = reth_trie::StateRoot<
357 DatabaseTrieCursorFactory<&'a TX, A>,
358 DatabaseHashedCursorFactory<&'a TX>,
359 >;
360
361 let expected_state_root = provider_rw
362 .header_by_number(block_number)?
363 .ok_or_else(|| {
364 eyre::eyre!("Missing canonical header at database block tip {block_number}")
365 })?
366 .state_root();
367
368 let computed_state_root = DbStateRoot::<_, A>::from_tx(provider_rw.tx_ref()).root()?;
369
370 if computed_state_root != expected_state_root {
371 return Err(eyre::eyre!(
372 "Repaired trie state root mismatch at block {block_number}: computed {computed_state_root}, header {expected_state_root}",
373 ))
374 }
375
376 Ok(())
377}
378
379fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
381 let mut current_value: u64 = 0;
386 let nibbles_to_use = last_account.len().min(16);
387
388 for i in 0..nibbles_to_use {
389 current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
390 }
391 if nibbles_to_use < 16 {
393 current_value <<= (16 - nibbles_to_use) * 4;
394 }
395
396 let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
397 let progress_percent_str = format!("{progress_percent:.2}");
398
399 let elapsed = start_time.elapsed();
401 let elapsed_secs = elapsed.as_secs_f64();
402
403 let estimated_total_time =
404 if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
405 let remaining_time = estimated_total_time - elapsed_secs;
406 let eta_duration = Duration::from_secs(remaining_time as u64);
407
408 info!(
409 progress_percent = progress_percent_str,
410 eta = %humantime::format_duration(eta_duration),
411 inconsistent_nodes,
412 "Repairing trie tables",
413 );
414}
415
416#[derive(Debug)]
418struct RepairTrieMetrics {
419 account_inconsistencies: Counter,
420 storage_inconsistencies: Counter,
421}
422
423impl RepairTrieMetrics {
424 fn new() -> Self {
425 Self {
426 account_inconsistencies: metrics::counter!(
427 "db.repair_trie.inconsistencies_found",
428 "type" => "account"
429 ),
430 storage_inconsistencies: metrics::counter!(
431 "db.repair_trie.inconsistencies_found",
432 "type" => "storage"
433 ),
434 }
435 }
436}