reth_cli_commands/db/
repair_trie.rs

1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7    database::Database,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::version::version_metadata;
13use reth_node_metrics::{
14    chain::ChainSpecInfo,
15    hooks::Hooks,
16    server::{MetricServer, MetricServerConfig},
17    version::VersionInfo,
18};
19use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
20use reth_stages::StageId;
21use reth_tasks::TaskExecutor;
22use reth_trie::{
23    verify::{Output, Verifier},
24    Nibbles,
25};
26use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
27use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
28use std::{
29    net::SocketAddr,
30    time::{Duration, Instant},
31};
32use tracing::{info, warn};
33
34const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
35
36/// The arguments for the `reth db repair-trie` command
37#[derive(Parser, Debug)]
38pub struct Command {
39    /// Only show inconsistencies without making any repairs
40    #[arg(long)]
41    pub(crate) dry_run: bool,
42
43    /// Enable Prometheus metrics.
44    ///
45    /// The metrics will be served at the given interface and port.
46    #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
47    pub(crate) metrics: Option<SocketAddr>,
48}
49
50impl Command {
51    /// Execute `db repair-trie` command
52    pub fn execute<N: ProviderNodeTypes>(
53        self,
54        tool: &DbTool<N>,
55        task_executor: TaskExecutor,
56    ) -> eyre::Result<()> {
57        // Set up metrics server if requested
58        let _metrics_handle = if let Some(listen_addr) = self.metrics {
59            let chain_name = tool.provider_factory.chain_spec().chain().to_string();
60            let executor = task_executor.clone();
61
62            let handle = task_executor.spawn_critical("metrics server", async move {
63                let config = MetricServerConfig::new(
64                    listen_addr,
65                    VersionInfo {
66                        version: version_metadata().cargo_pkg_version.as_ref(),
67                        build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
68                        cargo_features: version_metadata().vergen_cargo_features.as_ref(),
69                        git_sha: version_metadata().vergen_git_sha.as_ref(),
70                        target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
71                        build_profile: version_metadata().build_profile_name.as_ref(),
72                    },
73                    ChainSpecInfo { name: chain_name },
74                    executor,
75                    Hooks::builder().build(),
76                );
77
78                // Spawn the metrics server
79                if let Err(e) = MetricServer::new(config).serve().await {
80                    tracing::error!("Metrics server error: {}", e);
81                }
82            });
83
84            Some(handle)
85        } else {
86            None
87        };
88
89        if self.dry_run {
90            verify_only(tool)?
91        } else {
92            verify_and_repair(tool)?
93        }
94
95        Ok(())
96    }
97}
98
99fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
100    // Log the database block tip from Finish stage checkpoint
101    let finish_checkpoint = tool
102        .provider_factory
103        .provider()?
104        .get_stage_checkpoint(StageId::Finish)?
105        .unwrap_or_default();
106    info!("Database block tip: {}", finish_checkpoint.block_number);
107
108    // Get a database transaction directly from the database
109    let db = tool.provider_factory.db_ref();
110    let mut tx = db.tx()?;
111    tx.disable_long_read_transaction_safety();
112
113    // Create the verifier
114    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
115    let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
116    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
117
118    let metrics = RepairTrieMetrics::new();
119
120    let mut inconsistent_nodes = 0;
121    let start_time = Instant::now();
122    let mut last_progress_time = Instant::now();
123
124    // Iterate over the verifier and repair inconsistencies
125    for output_result in verifier {
126        let output = output_result?;
127
128        if let Output::Progress(path) = output {
129            if last_progress_time.elapsed() > PROGRESS_PERIOD {
130                output_progress(path, start_time, inconsistent_nodes);
131                last_progress_time = Instant::now();
132            }
133        } else {
134            warn!("Inconsistency found: {output:?}");
135            inconsistent_nodes += 1;
136
137            // Record metrics based on output type
138            match output {
139                Output::AccountExtra(_, _) |
140                Output::AccountWrong { .. } |
141                Output::AccountMissing(_, _) => {
142                    metrics.account_inconsistencies.increment(1);
143                }
144                Output::StorageExtra(_, _, _) |
145                Output::StorageWrong { .. } |
146                Output::StorageMissing(_, _, _) => {
147                    metrics.storage_inconsistencies.increment(1);
148                }
149                Output::Progress(_) => unreachable!(),
150            }
151        }
152    }
153
154    info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
155
156    Ok(())
157}
158
159/// Checks that the merkle stage has completed running up to the account and storage hashing stages.
160fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
161    let account_hashing_checkpoint =
162        provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
163    let storage_hashing_checkpoint =
164        provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
165    let merkle_checkpoint =
166        provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
167
168    if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
169        return Err(eyre::eyre!(
170            "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
171            merkle_checkpoint.block_number,
172            account_hashing_checkpoint.block_number,
173        ))
174    }
175
176    if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
177        return Err(eyre::eyre!(
178            "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
179            merkle_checkpoint.block_number,
180            storage_hashing_checkpoint.block_number,
181        ))
182    }
183
184    let merkle_checkpoint_progress =
185        provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
186    if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
187        return Err(eyre::eyre!(
188            "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
189        ))
190    }
191
192    Ok(())
193}
194
195fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
196    // Get a read-write database provider
197    let mut provider_rw = tool.provider_factory.provider_rw()?;
198
199    // Log the database block tip from Finish stage checkpoint
200    let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
201    info!("Database block tip: {}", finish_checkpoint.block_number);
202
203    // Check that a pipeline sync isn't in progress.
204    verify_checkpoints(provider_rw.as_ref())?;
205
206    // Create cursors for making modifications with
207    let tx = provider_rw.tx_mut();
208    tx.disable_long_read_transaction_safety();
209    let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
210    let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
211
212    // Create the cursor factories. These cannot accept the `&mut` tx above because they require it
213    // to be AsRef.
214    let tx = provider_rw.tx_ref();
215    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
216    let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
217
218    // Create the verifier
219    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
220
221    let metrics = RepairTrieMetrics::new();
222
223    let mut inconsistent_nodes = 0;
224    let start_time = Instant::now();
225    let mut last_progress_time = Instant::now();
226
227    // Iterate over the verifier and repair inconsistencies
228    for output_result in verifier {
229        let output = output_result?;
230
231        if !matches!(output, Output::Progress(_)) {
232            warn!("Inconsistency found, will repair: {output:?}");
233            inconsistent_nodes += 1;
234
235            // Record metrics based on output type
236            match &output {
237                Output::AccountExtra(_, _) |
238                Output::AccountWrong { .. } |
239                Output::AccountMissing(_, _) => {
240                    metrics.account_inconsistencies.increment(1);
241                }
242                Output::StorageExtra(_, _, _) |
243                Output::StorageWrong { .. } |
244                Output::StorageMissing(_, _, _) => {
245                    metrics.storage_inconsistencies.increment(1);
246                }
247                Output::Progress(_) => {}
248            }
249        }
250
251        match output {
252            Output::AccountExtra(path, _node) => {
253                // Extra account node in trie, remove it
254                let nibbles = StoredNibbles(path);
255                if account_trie_cursor.seek_exact(nibbles)?.is_some() {
256                    account_trie_cursor.delete_current()?;
257                }
258            }
259            Output::StorageExtra(account, path, _node) => {
260                // Extra storage node in trie, remove it
261                let nibbles = StoredNibblesSubKey(path);
262                if storage_trie_cursor
263                    .seek_by_key_subkey(account, nibbles.clone())?
264                    .filter(|e| e.nibbles == nibbles)
265                    .is_some()
266                {
267                    storage_trie_cursor.delete_current()?;
268                }
269            }
270            Output::AccountWrong { path, expected: node, .. } |
271            Output::AccountMissing(path, node) => {
272                // Wrong/missing account node value, upsert it
273                let nibbles = StoredNibbles(path);
274                account_trie_cursor.upsert(nibbles, &node)?;
275            }
276            Output::StorageWrong { account, path, expected: node, .. } |
277            Output::StorageMissing(account, path, node) => {
278                // Wrong/missing storage node value, upsert it
279                // (We can't just use `upsert` method with a dup cursor, it's not properly
280                // supported)
281                let nibbles = StoredNibblesSubKey(path);
282                let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
283                if storage_trie_cursor
284                    .seek_by_key_subkey(account, nibbles.clone())?
285                    .filter(|v| v.nibbles == nibbles)
286                    .is_some()
287                {
288                    storage_trie_cursor.delete_current()?;
289                }
290                storage_trie_cursor.upsert(account, &entry)?;
291            }
292            Output::Progress(path) => {
293                if last_progress_time.elapsed() > PROGRESS_PERIOD {
294                    output_progress(path, start_time, inconsistent_nodes);
295                    last_progress_time = Instant::now();
296                }
297            }
298        }
299    }
300
301    if inconsistent_nodes == 0 {
302        info!("No inconsistencies found");
303    } else {
304        provider_rw.commit()?;
305        info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
306    }
307
308    Ok(())
309}
310
311/// Output progress information based on the last seen account path.
312fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
313    // Calculate percentage based on position in the trie path space
314    // For progress estimation, we'll use the first few nibbles as an approximation
315
316    // Convert the first 16 nibbles (8 bytes) to a u64 for progress calculation
317    let mut current_value: u64 = 0;
318    let nibbles_to_use = last_account.len().min(16);
319
320    for i in 0..nibbles_to_use {
321        current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
322    }
323    // Shift left to fill remaining bits if we have fewer than 16 nibbles
324    if nibbles_to_use < 16 {
325        current_value <<= (16 - nibbles_to_use) * 4;
326    }
327
328    let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
329    let progress_percent_str = format!("{progress_percent:.2}");
330
331    // Calculate ETA based on current speed
332    let elapsed = start_time.elapsed();
333    let elapsed_secs = elapsed.as_secs_f64();
334
335    let estimated_total_time =
336        if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
337    let remaining_time = estimated_total_time - elapsed_secs;
338    let eta_duration = Duration::from_secs(remaining_time as u64);
339
340    info!(
341        progress_percent = progress_percent_str,
342        eta = %humantime::format_duration(eta_duration),
343        inconsistent_nodes,
344        "Repairing trie tables",
345    );
346}
347
348/// Metrics for tracking trie repair inconsistencies
349#[derive(Debug)]
350struct RepairTrieMetrics {
351    account_inconsistencies: Counter,
352    storage_inconsistencies: Counter,
353}
354
355impl RepairTrieMetrics {
356    fn new() -> Self {
357        Self {
358            account_inconsistencies: metrics::counter!(
359                "db.repair_trie.inconsistencies_found",
360                "type" => "account"
361            ),
362            storage_inconsistencies: metrics::counter!(
363                "db.repair_trie.inconsistencies_found",
364                "type" => "storage"
365            ),
366        }
367    }
368}