Skip to main content

reth_cli_commands/db/
repair_trie.rs

1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7    database::Database,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::{
13    dirs::{ChainPath, DataDirPath},
14    version::version_metadata,
15};
16use reth_node_metrics::{
17    chain::ChainSpecInfo,
18    hooks::Hooks,
19    server::{MetricServer, MetricServerConfig},
20    version::VersionInfo,
21};
22use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
23use reth_stages::StageId;
24use reth_tasks::TaskExecutor;
25use reth_trie::{
26    verify::{Output, Verifier},
27    Nibbles,
28};
29use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
30use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
31use std::{
32    net::SocketAddr,
33    time::{Duration, Instant},
34};
35use tracing::{info, warn};
36
37const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
38
39/// The arguments for the `reth db repair-trie` command
40#[derive(Parser, Debug)]
41pub struct Command {
42    /// Only show inconsistencies without making any repairs
43    #[arg(long)]
44    pub(crate) dry_run: bool,
45
46    /// Enable Prometheus metrics.
47    ///
48    /// The metrics will be served at the given interface and port.
49    #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
50    pub(crate) metrics: Option<SocketAddr>,
51}
52
53impl Command {
54    /// Execute `db repair-trie` command
55    pub fn execute<N: ProviderNodeTypes>(
56        self,
57        tool: &DbTool<N>,
58        task_executor: TaskExecutor,
59        data_dir: &ChainPath<DataDirPath>,
60    ) -> eyre::Result<()> {
61        // Set up metrics server if requested
62        let _metrics_handle = if let Some(listen_addr) = self.metrics {
63            let chain_name = tool.provider_factory.chain_spec().chain().to_string();
64            let executor = task_executor.clone();
65            let pprof_dump_dir = data_dir.pprof_dumps();
66
67            let handle = task_executor.spawn_critical_task("metrics server", async move {
68                let config = MetricServerConfig::new(
69                    listen_addr,
70                    VersionInfo {
71                        version: version_metadata().cargo_pkg_version.as_ref(),
72                        build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
73                        cargo_features: version_metadata().vergen_cargo_features.as_ref(),
74                        git_sha: version_metadata().vergen_git_sha.as_ref(),
75                        target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
76                        build_profile: version_metadata().build_profile_name.as_ref(),
77                    },
78                    ChainSpecInfo { name: chain_name },
79                    executor,
80                    Hooks::builder().build(),
81                    pprof_dump_dir,
82                );
83
84                // Spawn the metrics server
85                if let Err(e) = MetricServer::new(config).serve().await {
86                    tracing::error!("Metrics server error: {}", e);
87                }
88            });
89
90            Some(handle)
91        } else {
92            None
93        };
94
95        if self.dry_run {
96            verify_only(tool)?
97        } else {
98            verify_and_repair(tool)?
99        }
100
101        Ok(())
102    }
103}
104
105fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
106    // Log the database block tip from Finish stage checkpoint
107    let finish_checkpoint = tool
108        .provider_factory
109        .provider()?
110        .get_stage_checkpoint(StageId::Finish)?
111        .unwrap_or_default();
112    info!("Database block tip: {}", finish_checkpoint.block_number);
113
114    // Get a database transaction directly from the database
115    let db = tool.provider_factory.db_ref();
116    let mut tx = db.tx()?;
117    tx.disable_long_read_transaction_safety();
118
119    // Create the verifier
120    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
121    let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
122    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
123
124    let metrics = RepairTrieMetrics::new();
125
126    let mut inconsistent_nodes = 0;
127    let start_time = Instant::now();
128    let mut last_progress_time = Instant::now();
129
130    // Iterate over the verifier and repair inconsistencies
131    for output_result in verifier {
132        let output = output_result?;
133
134        if let Output::Progress(path) = output {
135            if last_progress_time.elapsed() > PROGRESS_PERIOD {
136                output_progress(path, start_time, inconsistent_nodes);
137                last_progress_time = Instant::now();
138            }
139        } else {
140            warn!("Inconsistency found: {output:?}");
141            inconsistent_nodes += 1;
142
143            // Record metrics based on output type
144            match output {
145                Output::AccountExtra(_, _) |
146                Output::AccountWrong { .. } |
147                Output::AccountMissing(_, _) => {
148                    metrics.account_inconsistencies.increment(1);
149                }
150                Output::StorageExtra(_, _, _) |
151                Output::StorageWrong { .. } |
152                Output::StorageMissing(_, _, _) => {
153                    metrics.storage_inconsistencies.increment(1);
154                }
155                Output::Progress(_) => unreachable!(),
156            }
157        }
158    }
159
160    info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
161
162    Ok(())
163}
164
165/// Checks that the merkle stage has completed running up to the account and storage hashing stages.
166fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
167    let account_hashing_checkpoint =
168        provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
169    let storage_hashing_checkpoint =
170        provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
171    let merkle_checkpoint =
172        provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
173
174    if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
175        return Err(eyre::eyre!(
176            "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
177            merkle_checkpoint.block_number,
178            account_hashing_checkpoint.block_number,
179        ))
180    }
181
182    if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
183        return Err(eyre::eyre!(
184            "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
185            merkle_checkpoint.block_number,
186            storage_hashing_checkpoint.block_number,
187        ))
188    }
189
190    let merkle_checkpoint_progress =
191        provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
192    if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
193        return Err(eyre::eyre!(
194            "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
195        ))
196    }
197
198    Ok(())
199}
200
201fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
202    // Get a read-write database provider
203    let mut provider_rw = tool.provider_factory.provider_rw()?;
204
205    // Log the database block tip from Finish stage checkpoint
206    let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
207    info!("Database block tip: {}", finish_checkpoint.block_number);
208
209    // Check that a pipeline sync isn't in progress.
210    verify_checkpoints(provider_rw.as_ref())?;
211
212    // Create cursors for making modifications with
213    let tx = provider_rw.tx_mut();
214    tx.disable_long_read_transaction_safety();
215    let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
216    let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
217
218    // Create the cursor factories. These cannot accept the `&mut` tx above because they require it
219    // to be AsRef.
220    let tx = provider_rw.tx_ref();
221    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
222    let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
223
224    // Create the verifier
225    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
226
227    let metrics = RepairTrieMetrics::new();
228
229    let mut inconsistent_nodes = 0;
230    let start_time = Instant::now();
231    let mut last_progress_time = Instant::now();
232
233    // Iterate over the verifier and repair inconsistencies
234    for output_result in verifier {
235        let output = output_result?;
236
237        if !matches!(output, Output::Progress(_)) {
238            warn!("Inconsistency found, will repair: {output:?}");
239            inconsistent_nodes += 1;
240
241            // Record metrics based on output type
242            match &output {
243                Output::AccountExtra(_, _) |
244                Output::AccountWrong { .. } |
245                Output::AccountMissing(_, _) => {
246                    metrics.account_inconsistencies.increment(1);
247                }
248                Output::StorageExtra(_, _, _) |
249                Output::StorageWrong { .. } |
250                Output::StorageMissing(_, _, _) => {
251                    metrics.storage_inconsistencies.increment(1);
252                }
253                Output::Progress(_) => {}
254            }
255        }
256
257        match output {
258            Output::AccountExtra(path, _node) => {
259                // Extra account node in trie, remove it
260                let nibbles = StoredNibbles(path);
261                if account_trie_cursor.seek_exact(nibbles)?.is_some() {
262                    account_trie_cursor.delete_current()?;
263                }
264            }
265            Output::StorageExtra(account, path, _node) => {
266                // Extra storage node in trie, remove it
267                let nibbles = StoredNibblesSubKey(path);
268                if storage_trie_cursor
269                    .seek_by_key_subkey(account, nibbles.clone())?
270                    .filter(|e| e.nibbles == nibbles)
271                    .is_some()
272                {
273                    storage_trie_cursor.delete_current()?;
274                }
275            }
276            Output::AccountWrong { path, expected: node, .. } |
277            Output::AccountMissing(path, node) => {
278                // Wrong/missing account node value, upsert it
279                let nibbles = StoredNibbles(path);
280                account_trie_cursor.upsert(nibbles, &node)?;
281            }
282            Output::StorageWrong { account, path, expected: node, .. } |
283            Output::StorageMissing(account, path, node) => {
284                // Wrong/missing storage node value, upsert it
285                // (We can't just use `upsert` method with a dup cursor, it's not properly
286                // supported)
287                let nibbles = StoredNibblesSubKey(path);
288                let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
289                if storage_trie_cursor
290                    .seek_by_key_subkey(account, nibbles.clone())?
291                    .filter(|v| v.nibbles == nibbles)
292                    .is_some()
293                {
294                    storage_trie_cursor.delete_current()?;
295                }
296                storage_trie_cursor.upsert(account, &entry)?;
297            }
298            Output::Progress(path) => {
299                if last_progress_time.elapsed() > PROGRESS_PERIOD {
300                    output_progress(path, start_time, inconsistent_nodes);
301                    last_progress_time = Instant::now();
302                }
303            }
304        }
305    }
306
307    if inconsistent_nodes == 0 {
308        info!("No inconsistencies found");
309    } else {
310        provider_rw.commit()?;
311        info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
312    }
313
314    Ok(())
315}
316
317/// Output progress information based on the last seen account path.
318fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
319    // Calculate percentage based on position in the trie path space
320    // For progress estimation, we'll use the first few nibbles as an approximation
321
322    // Convert the first 16 nibbles (8 bytes) to a u64 for progress calculation
323    let mut current_value: u64 = 0;
324    let nibbles_to_use = last_account.len().min(16);
325
326    for i in 0..nibbles_to_use {
327        current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
328    }
329    // Shift left to fill remaining bits if we have fewer than 16 nibbles
330    if nibbles_to_use < 16 {
331        current_value <<= (16 - nibbles_to_use) * 4;
332    }
333
334    let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
335    let progress_percent_str = format!("{progress_percent:.2}");
336
337    // Calculate ETA based on current speed
338    let elapsed = start_time.elapsed();
339    let elapsed_secs = elapsed.as_secs_f64();
340
341    let estimated_total_time =
342        if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
343    let remaining_time = estimated_total_time - elapsed_secs;
344    let eta_duration = Duration::from_secs(remaining_time as u64);
345
346    info!(
347        progress_percent = progress_percent_str,
348        eta = %humantime::format_duration(eta_duration),
349        inconsistent_nodes,
350        "Repairing trie tables",
351    );
352}
353
354/// Metrics for tracking trie repair inconsistencies
355#[derive(Debug)]
356struct RepairTrieMetrics {
357    account_inconsistencies: Counter,
358    storage_inconsistencies: Counter,
359}
360
361impl RepairTrieMetrics {
362    fn new() -> Self {
363        Self {
364            account_inconsistencies: metrics::counter!(
365                "db.repair_trie.inconsistencies_found",
366                "type" => "account"
367            ),
368            storage_inconsistencies: metrics::counter!(
369                "db.repair_trie.inconsistencies_found",
370                "type" => "storage"
371            ),
372        }
373    }
374}