Skip to main content

reth_cli_commands/db/
repair_trie.rs

1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7    database::Database,
8    transaction::{DbTx, DbTxMut},
9};
10use reth_db_common::DbTool;
11use reth_node_core::{
12    dirs::{ChainPath, DataDirPath},
13    version::version_metadata,
14};
15use reth_node_metrics::{
16    chain::ChainSpecInfo,
17    hooks::Hooks,
18    server::{MetricServer, MetricServerConfig},
19    version::VersionInfo,
20};
21use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
22use reth_stages::StageId;
23use reth_storage_api::StorageSettingsCache;
24use reth_tasks::TaskExecutor;
25use reth_trie::{
26    verify::{Output, Verifier},
27    Nibbles,
28};
29use reth_trie_db::{
30    DatabaseHashedCursorFactory, DatabaseTrieCursorFactory, StorageTrieEntryLike, TrieTableAdapter,
31};
32use std::{
33    net::SocketAddr,
34    time::{Duration, Instant},
35};
36use tracing::{info, warn};
37
38const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
39
40/// The arguments for the `reth db repair-trie` command
41#[derive(Parser, Debug)]
42pub struct Command {
43    /// Only show inconsistencies without making any repairs
44    #[arg(long)]
45    pub(crate) dry_run: bool,
46
47    /// Enable Prometheus metrics.
48    ///
49    /// The metrics will be served at the given interface and port.
50    #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
51    pub(crate) metrics: Option<SocketAddr>,
52}
53
54impl Command {
55    /// Execute `db repair-trie` command
56    pub fn execute<N: ProviderNodeTypes>(
57        self,
58        tool: &DbTool<N>,
59        task_executor: TaskExecutor,
60        data_dir: &ChainPath<DataDirPath>,
61    ) -> eyre::Result<()> {
62        // Set up metrics server if requested
63        let _metrics_handle = if let Some(listen_addr) = self.metrics {
64            let chain_name = tool.provider_factory.chain_spec().chain().to_string();
65            let executor = task_executor.clone();
66            let pprof_dump_dir = data_dir.pprof_dumps();
67
68            let handle = task_executor.spawn_critical_task("metrics server", async move {
69                let config = MetricServerConfig::new(
70                    listen_addr,
71                    VersionInfo {
72                        version: version_metadata().cargo_pkg_version.as_ref(),
73                        build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
74                        cargo_features: version_metadata().vergen_cargo_features.as_ref(),
75                        git_sha: version_metadata().vergen_git_sha.as_ref(),
76                        target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
77                        build_profile: version_metadata().build_profile_name.as_ref(),
78                    },
79                    ChainSpecInfo { name: chain_name },
80                    executor,
81                    Hooks::builder().build(),
82                    pprof_dump_dir,
83                );
84
85                // Spawn the metrics server
86                if let Err(e) = MetricServer::new(config).serve().await {
87                    tracing::error!("Metrics server error: {}", e);
88                }
89            });
90
91            Some(handle)
92        } else {
93            None
94        };
95
96        if self.dry_run {
97            verify_only(tool)?
98        } else {
99            verify_and_repair(tool)?
100        }
101
102        Ok(())
103    }
104}
105
106fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
107    // Log the database block tip from Finish stage checkpoint
108    let finish_checkpoint = tool
109        .provider_factory
110        .provider()?
111        .get_stage_checkpoint(StageId::Finish)?
112        .unwrap_or_default();
113    info!("Database block tip: {}", finish_checkpoint.block_number);
114
115    // Get a database transaction directly from the database
116    let db = tool.provider_factory.db_ref();
117    let mut tx = db.tx()?;
118    tx.disable_long_read_transaction_safety();
119
120    reth_trie_db::with_adapter!(tool.provider_factory, |A| do_verify_only::<_, A>(&tx))
121}
122
123fn do_verify_only<TX: DbTx, A: TrieTableAdapter>(tx: &TX) -> eyre::Result<()> {
124    // Create the verifier
125    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
126    let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
127    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
128
129    let metrics = RepairTrieMetrics::new();
130
131    let mut inconsistent_nodes = 0;
132    let start_time = Instant::now();
133    let mut last_progress_time = Instant::now();
134
135    // Iterate over the verifier and repair inconsistencies
136    for output_result in verifier {
137        let output = output_result?;
138
139        if let Output::Progress(path) = output {
140            if last_progress_time.elapsed() > PROGRESS_PERIOD {
141                output_progress(path, start_time, inconsistent_nodes);
142                last_progress_time = Instant::now();
143            }
144        } else {
145            warn!("Inconsistency found: {output:?}");
146            inconsistent_nodes += 1;
147
148            // Record metrics based on output type
149            match output {
150                Output::AccountExtra(_, _) |
151                Output::AccountWrong { .. } |
152                Output::AccountMissing(_, _) => {
153                    metrics.account_inconsistencies.increment(1);
154                }
155                Output::StorageExtra(_, _, _) |
156                Output::StorageWrong { .. } |
157                Output::StorageMissing(_, _, _) => {
158                    metrics.storage_inconsistencies.increment(1);
159                }
160                Output::Progress(_) => unreachable!(),
161            }
162        }
163    }
164
165    info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
166
167    Ok(())
168}
169
170/// Checks that the merkle stage has completed running up to the account and storage hashing stages.
171fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
172    let account_hashing_checkpoint =
173        provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
174    let storage_hashing_checkpoint =
175        provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
176    let merkle_checkpoint =
177        provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
178
179    if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
180        return Err(eyre::eyre!(
181            "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
182            merkle_checkpoint.block_number,
183            account_hashing_checkpoint.block_number,
184        ))
185    }
186
187    if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
188        return Err(eyre::eyre!(
189            "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
190            merkle_checkpoint.block_number,
191            storage_hashing_checkpoint.block_number,
192        ))
193    }
194
195    let merkle_checkpoint_progress =
196        provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
197    if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
198        return Err(eyre::eyre!(
199            "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
200        ))
201    }
202
203    Ok(())
204}
205
206fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
207    // Get a read-write database provider
208    let mut provider_rw = tool.provider_factory.provider_rw()?;
209
210    // Log the database block tip from Finish stage checkpoint
211    let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
212    info!("Database block tip: {}", finish_checkpoint.block_number);
213
214    // Check that a pipeline sync isn't in progress.
215    verify_checkpoints(provider_rw.as_ref())?;
216
217    let inconsistent_nodes = reth_trie_db::with_adapter!(tool.provider_factory, |A| {
218        do_verify_and_repair::<_, A>(&mut provider_rw)?
219    });
220
221    if inconsistent_nodes == 0 {
222        info!("No inconsistencies found");
223    } else {
224        provider_rw.commit()?;
225        info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
226    }
227
228    Ok(())
229}
230
231fn do_verify_and_repair<N: ProviderNodeTypes, A: TrieTableAdapter>(
232    provider_rw: &mut reth_provider::DatabaseProviderRW<N::DB, N>,
233) -> eyre::Result<usize>
234where
235    <N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
236{
237    // Create cursors for making modifications with
238    let tx = provider_rw.tx_mut();
239    tx.disable_long_read_transaction_safety();
240    let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
241    let mut storage_trie_cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
242
243    // Create the cursor factories. These cannot accept the `&mut` tx above because they
244    // require it to be AsRef.
245    let tx = provider_rw.tx_ref();
246    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
247    let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
248
249    // Create the verifier
250    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
251
252    let metrics = RepairTrieMetrics::new();
253
254    let mut inconsistent_nodes = 0;
255    let start_time = Instant::now();
256    let mut last_progress_time = Instant::now();
257
258    // Iterate over the verifier and repair inconsistencies
259    for output_result in verifier {
260        let output = output_result?;
261
262        if !matches!(output, Output::Progress(_)) {
263            warn!("Inconsistency found, will repair: {output:?}");
264            inconsistent_nodes += 1;
265
266            // Record metrics based on output type
267            match &output {
268                Output::AccountExtra(_, _) |
269                Output::AccountWrong { .. } |
270                Output::AccountMissing(_, _) => {
271                    metrics.account_inconsistencies.increment(1);
272                }
273                Output::StorageExtra(_, _, _) |
274                Output::StorageWrong { .. } |
275                Output::StorageMissing(_, _, _) => {
276                    metrics.storage_inconsistencies.increment(1);
277                }
278                Output::Progress(_) => {}
279            }
280        }
281
282        match output {
283            Output::AccountExtra(path, _node) => {
284                // Extra account node in trie, remove it
285                let key: A::AccountKey = path.into();
286                if account_trie_cursor.seek_exact(key)?.is_some() {
287                    account_trie_cursor.delete_current()?;
288                }
289            }
290            Output::StorageExtra(account, path, _node) => {
291                // Extra storage node in trie, remove it
292                let subkey: A::StorageSubKey = path.into();
293                if storage_trie_cursor
294                    .seek_by_key_subkey(account, subkey.clone())?
295                    .filter(|e| *e.nibbles() == subkey)
296                    .is_some()
297                {
298                    storage_trie_cursor.delete_current()?;
299                }
300            }
301            Output::AccountWrong { path, expected: node, .. } |
302            Output::AccountMissing(path, node) => {
303                // Wrong/missing account node value, upsert it
304                let key: A::AccountKey = path.into();
305                account_trie_cursor.upsert(key, &node)?;
306            }
307            Output::StorageWrong { account, path, expected: node, .. } |
308            Output::StorageMissing(account, path, node) => {
309                // Wrong/missing storage node value, upsert it
310                // (We can't just use `upsert` method with a dup cursor, it's not properly
311                // supported)
312                let subkey: A::StorageSubKey = path.into();
313                let entry = A::StorageValue::new(subkey.clone(), node);
314                if storage_trie_cursor
315                    .seek_by_key_subkey(account, subkey.clone())?
316                    .filter(|v| *v.nibbles() == subkey)
317                    .is_some()
318                {
319                    storage_trie_cursor.delete_current()?;
320                }
321                storage_trie_cursor.upsert(account, &entry)?;
322            }
323            Output::Progress(path) => {
324                if last_progress_time.elapsed() > PROGRESS_PERIOD {
325                    output_progress(path, start_time, inconsistent_nodes);
326                    last_progress_time = Instant::now();
327                }
328            }
329        }
330    }
331
332    Ok(inconsistent_nodes as usize)
333}
334
335/// Output progress information based on the last seen account path.
336fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
337    // Calculate percentage based on position in the trie path space
338    // For progress estimation, we'll use the first few nibbles as an approximation
339
340    // Convert the first 16 nibbles (8 bytes) to a u64 for progress calculation
341    let mut current_value: u64 = 0;
342    let nibbles_to_use = last_account.len().min(16);
343
344    for i in 0..nibbles_to_use {
345        current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
346    }
347    // Shift left to fill remaining bits if we have fewer than 16 nibbles
348    if nibbles_to_use < 16 {
349        current_value <<= (16 - nibbles_to_use) * 4;
350    }
351
352    let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
353    let progress_percent_str = format!("{progress_percent:.2}");
354
355    // Calculate ETA based on current speed
356    let elapsed = start_time.elapsed();
357    let elapsed_secs = elapsed.as_secs_f64();
358
359    let estimated_total_time =
360        if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
361    let remaining_time = estimated_total_time - elapsed_secs;
362    let eta_duration = Duration::from_secs(remaining_time as u64);
363
364    info!(
365        progress_percent = progress_percent_str,
366        eta = %humantime::format_duration(eta_duration),
367        inconsistent_nodes,
368        "Repairing trie tables",
369    );
370}
371
372/// Metrics for tracking trie repair inconsistencies
373#[derive(Debug)]
374struct RepairTrieMetrics {
375    account_inconsistencies: Counter,
376    storage_inconsistencies: Counter,
377}
378
379impl RepairTrieMetrics {
380    fn new() -> Self {
381        Self {
382            account_inconsistencies: metrics::counter!(
383                "db.repair_trie.inconsistencies_found",
384                "type" => "account"
385            ),
386            storage_inconsistencies: metrics::counter!(
387                "db.repair_trie.inconsistencies_found",
388                "type" => "storage"
389            ),
390        }
391    }
392}