Skip to main content

reth_cli_commands/db/
repair_trie.rs

1use alloy_consensus::BlockHeader as AlloyBlockHeader;
2use clap::Parser;
3use metrics::{self, Counter};
4use reth_chainspec::EthChainSpec;
5use reth_cli_util::parse_socket_address;
6use reth_db_api::{
7    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
8    database::Database,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::{
13    dirs::{ChainPath, DataDirPath},
14    version::version_metadata,
15};
16use reth_node_metrics::{
17    chain::ChainSpecInfo,
18    hooks::Hooks,
19    server::{MetricServer, MetricServerConfig},
20    version::VersionInfo,
21};
22use reth_provider::{
23    providers::ProviderNodeTypes, ChainSpecProvider, HeaderProvider, StageCheckpointReader,
24};
25use reth_stages::StageId;
26use reth_storage_api::StorageSettingsCache;
27use reth_tasks::TaskExecutor;
28use reth_trie::{
29    verify::{Output, Verifier},
30    Nibbles,
31};
32use reth_trie_db::{
33    DatabaseHashedCursorFactory, DatabaseStateRoot, DatabaseTrieCursorFactory,
34    StorageTrieEntryLike, TrieTableAdapter,
35};
36use std::{
37    net::SocketAddr,
38    time::{Duration, Instant},
39};
40use tracing::{info, warn};
41
42const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
43
44/// The arguments for the `reth db repair-trie` command
45#[derive(Parser, Debug)]
46pub struct Command {
47    /// Only show inconsistencies without making any repairs
48    #[arg(long)]
49    pub(crate) dry_run: bool,
50
51    /// Enable Prometheus metrics.
52    ///
53    /// The metrics will be served at the given interface and port.
54    #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
55    pub(crate) metrics: Option<SocketAddr>,
56}
57
58impl Command {
59    /// Execute `db repair-trie` command
60    pub fn execute<N: ProviderNodeTypes>(
61        self,
62        tool: &DbTool<N>,
63        task_executor: TaskExecutor,
64        data_dir: &ChainPath<DataDirPath>,
65    ) -> eyre::Result<()> {
66        // Set up metrics server if requested
67        let _metrics_handle = if let Some(listen_addr) = self.metrics {
68            let chain_name = tool.provider_factory.chain_spec().chain().to_string();
69            let executor = task_executor.clone();
70            let pprof_dump_dir = data_dir.pprof_dumps();
71
72            let handle = task_executor.spawn_critical_task("metrics server", async move {
73                let config = MetricServerConfig::new(
74                    listen_addr,
75                    VersionInfo {
76                        version: version_metadata().cargo_pkg_version.as_ref(),
77                        build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
78                        cargo_features: version_metadata().vergen_cargo_features.as_ref(),
79                        git_sha: version_metadata().vergen_git_sha.as_ref(),
80                        target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
81                        build_profile: version_metadata().build_profile_name.as_ref(),
82                    },
83                    ChainSpecInfo { name: chain_name },
84                    executor,
85                    Hooks::builder().build(),
86                    pprof_dump_dir,
87                );
88
89                // Spawn the metrics server
90                if let Err(e) = MetricServer::new(config).serve().await {
91                    tracing::error!("Metrics server error: {}", e);
92                }
93            });
94
95            Some(handle)
96        } else {
97            None
98        };
99
100        if self.dry_run {
101            verify_only(tool)?
102        } else {
103            verify_and_repair(tool)?
104        }
105
106        Ok(())
107    }
108}
109
110fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
111    // Log the database block tip from Finish stage checkpoint
112    let finish_checkpoint = tool
113        .provider_factory
114        .provider()?
115        .get_stage_checkpoint(StageId::Finish)?
116        .unwrap_or_default();
117    info!("Database block tip: {}", finish_checkpoint.block_number);
118
119    // Get a database transaction directly from the database
120    let db = tool.provider_factory.db_ref();
121    let mut tx = db.tx()?;
122    tx.disable_long_read_transaction_safety();
123
124    reth_trie_db::with_adapter!(tool.provider_factory, |A| do_verify_only::<_, A>(&tx))
125}
126
127fn do_verify_only<TX: DbTx, A: TrieTableAdapter>(tx: &TX) -> eyre::Result<()> {
128    // Create the verifier
129    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
130    let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
131    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
132
133    let metrics = RepairTrieMetrics::new();
134
135    let mut inconsistent_nodes = 0;
136    let start_time = Instant::now();
137    let mut last_progress_time = Instant::now();
138
139    // Iterate over the verifier and repair inconsistencies
140    for output_result in verifier {
141        let output = output_result?;
142
143        if let Output::Progress(path) = output {
144            if last_progress_time.elapsed() > PROGRESS_PERIOD {
145                output_progress(path, start_time, inconsistent_nodes);
146                last_progress_time = Instant::now();
147            }
148        } else {
149            warn!("Inconsistency found: {output:?}");
150            inconsistent_nodes += 1;
151
152            // Record metrics based on output type
153            match output {
154                Output::AccountExtra(_, _) |
155                Output::AccountWrong { .. } |
156                Output::AccountMissing(_, _) => {
157                    metrics.account_inconsistencies.increment(1);
158                }
159                Output::StorageExtra(_, _, _) |
160                Output::StorageWrong { .. } |
161                Output::StorageMissing(_, _, _) => {
162                    metrics.storage_inconsistencies.increment(1);
163                }
164                Output::Progress(_) => unreachable!(),
165            }
166        }
167    }
168
169    info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
170
171    Ok(())
172}
173
174/// Checks that the merkle stage has completed running up to the account and storage hashing stages.
175fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
176    let account_hashing_checkpoint =
177        provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
178    let storage_hashing_checkpoint =
179        provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
180    let merkle_checkpoint =
181        provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
182
183    if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
184        return Err(eyre::eyre!(
185            "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
186            merkle_checkpoint.block_number,
187            account_hashing_checkpoint.block_number,
188        ))
189    }
190
191    if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
192        return Err(eyre::eyre!(
193            "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
194            merkle_checkpoint.block_number,
195            storage_hashing_checkpoint.block_number,
196        ))
197    }
198
199    let merkle_checkpoint_progress =
200        provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
201    if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
202        return Err(eyre::eyre!(
203            "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
204        ))
205    }
206
207    Ok(())
208}
209
210fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
211    // Get a read-write database provider
212    let mut provider_rw = tool.provider_factory.provider_rw()?;
213
214    // Log the database block tip from Finish stage checkpoint
215    let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
216    info!("Database block tip: {}", finish_checkpoint.block_number);
217
218    // Check that a pipeline sync isn't in progress.
219    verify_checkpoints(provider_rw.as_ref())?;
220
221    let inconsistent_nodes = reth_trie_db::with_adapter!(tool.provider_factory, |A| {
222        do_verify_and_repair::<_, A>(&mut provider_rw, finish_checkpoint.block_number)?
223    });
224
225    if inconsistent_nodes == 0 {
226        info!("No inconsistencies found");
227    } else {
228        provider_rw.commit()?;
229        info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
230    }
231
232    Ok(())
233}
234
235fn do_verify_and_repair<N: ProviderNodeTypes, A: TrieTableAdapter>(
236    provider_rw: &mut reth_provider::DatabaseProviderRW<N::DB, N>,
237    block_number: u64,
238) -> eyre::Result<usize>
239where
240    <N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
241{
242    // Create cursors for making modifications with
243    let tx = provider_rw.tx_mut();
244    tx.disable_long_read_transaction_safety();
245    let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
246    let mut storage_trie_cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
247
248    // Create the cursor factories. These cannot accept the `&mut` tx above because they
249    // require it to be AsRef.
250    let tx = provider_rw.tx_ref();
251    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
252    let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
253
254    // Create the verifier
255    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
256
257    let metrics = RepairTrieMetrics::new();
258
259    let mut inconsistent_nodes = 0;
260    let start_time = Instant::now();
261    let mut last_progress_time = Instant::now();
262
263    // Iterate over the verifier and repair inconsistencies
264    for output_result in verifier {
265        let output = output_result?;
266
267        if !matches!(output, Output::Progress(_)) {
268            warn!("Inconsistency found, will repair: {output:?}");
269            inconsistent_nodes += 1;
270
271            // Record metrics based on output type
272            match &output {
273                Output::AccountExtra(_, _) |
274                Output::AccountWrong { .. } |
275                Output::AccountMissing(_, _) => {
276                    metrics.account_inconsistencies.increment(1);
277                }
278                Output::StorageExtra(_, _, _) |
279                Output::StorageWrong { .. } |
280                Output::StorageMissing(_, _, _) => {
281                    metrics.storage_inconsistencies.increment(1);
282                }
283                Output::Progress(_) => {}
284            }
285        }
286
287        match output {
288            Output::AccountExtra(path, _node) => {
289                // Extra account node in trie, remove it
290                let key: A::AccountKey = path.into();
291                if account_trie_cursor.seek_exact(key)?.is_some() {
292                    account_trie_cursor.delete_current()?;
293                }
294            }
295            Output::StorageExtra(account, path, _node) => {
296                // Extra storage node in trie, remove it
297                let subkey: A::StorageSubKey = path.into();
298                if storage_trie_cursor
299                    .seek_by_key_subkey(account, subkey.clone())?
300                    .filter(|e| *e.nibbles() == subkey)
301                    .is_some()
302                {
303                    storage_trie_cursor.delete_current()?;
304                }
305            }
306            Output::AccountWrong { path, expected: node, .. } |
307            Output::AccountMissing(path, node) => {
308                // Wrong/missing account node value, upsert it
309                let key: A::AccountKey = path.into();
310                account_trie_cursor.upsert(key, &node)?;
311            }
312            Output::StorageWrong { account, path, expected: node, .. } |
313            Output::StorageMissing(account, path, node) => {
314                // Wrong/missing storage node value, upsert it
315                // (We can't just use `upsert` method with a dup cursor, it's not properly
316                // supported)
317                let subkey: A::StorageSubKey = path.into();
318                let entry = A::StorageValue::new(subkey.clone(), node);
319                if storage_trie_cursor
320                    .seek_by_key_subkey(account, subkey.clone())?
321                    .filter(|v| *v.nibbles() == subkey)
322                    .is_some()
323                {
324                    storage_trie_cursor.delete_current()?;
325                }
326                storage_trie_cursor.upsert(account, &entry)?;
327            }
328            Output::Progress(path) => {
329                if last_progress_time.elapsed() > PROGRESS_PERIOD {
330                    output_progress(path, start_time, inconsistent_nodes);
331                    last_progress_time = Instant::now();
332                }
333            }
334        }
335    }
336
337    drop(account_trie_cursor);
338    drop(storage_trie_cursor);
339
340    if inconsistent_nodes > 0 {
341        // Refuse to commit repaired trie tables unless they reproduce the canonical tip state
342        // root.
343        verify_repaired_state_root::<_, A>(provider_rw, block_number)?;
344    }
345
346    Ok(inconsistent_nodes as usize)
347}
348
349fn verify_repaired_state_root<N: ProviderNodeTypes, A: TrieTableAdapter>(
350    provider_rw: &reth_provider::DatabaseProviderRW<N::DB, N>,
351    block_number: u64,
352) -> eyre::Result<()>
353where
354    <N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
355{
356    type DbStateRoot<'a, TX, A> = reth_trie::StateRoot<
357        DatabaseTrieCursorFactory<&'a TX, A>,
358        DatabaseHashedCursorFactory<&'a TX>,
359    >;
360
361    let expected_state_root = provider_rw
362        .header_by_number(block_number)?
363        .ok_or_else(|| {
364            eyre::eyre!("Missing canonical header at database block tip {block_number}")
365        })?
366        .state_root();
367
368    let computed_state_root = DbStateRoot::<_, A>::from_tx(provider_rw.tx_ref()).root()?;
369
370    if computed_state_root != expected_state_root {
371        return Err(eyre::eyre!(
372            "Repaired trie state root mismatch at block {block_number}: computed {computed_state_root}, header {expected_state_root}",
373        ))
374    }
375
376    Ok(())
377}
378
379/// Output progress information based on the last seen account path.
380fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
381    // Calculate percentage based on position in the trie path space
382    // For progress estimation, we'll use the first few nibbles as an approximation
383
384    // Convert the first 16 nibbles (8 bytes) to a u64 for progress calculation
385    let mut current_value: u64 = 0;
386    let nibbles_to_use = last_account.len().min(16);
387
388    for i in 0..nibbles_to_use {
389        current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
390    }
391    // Shift left to fill remaining bits if we have fewer than 16 nibbles
392    if nibbles_to_use < 16 {
393        current_value <<= (16 - nibbles_to_use) * 4;
394    }
395
396    let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
397    let progress_percent_str = format!("{progress_percent:.2}");
398
399    // Calculate ETA based on current speed
400    let elapsed = start_time.elapsed();
401    let elapsed_secs = elapsed.as_secs_f64();
402
403    let estimated_total_time =
404        if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
405    let remaining_time = estimated_total_time - elapsed_secs;
406    let eta_duration = Duration::from_secs(remaining_time as u64);
407
408    info!(
409        progress_percent = progress_percent_str,
410        eta = %humantime::format_duration(eta_duration),
411        inconsistent_nodes,
412        "Repairing trie tables",
413    );
414}
415
416/// Metrics for tracking trie repair inconsistencies
417#[derive(Debug)]
418struct RepairTrieMetrics {
419    account_inconsistencies: Counter,
420    storage_inconsistencies: Counter,
421}
422
423impl RepairTrieMetrics {
424    fn new() -> Self {
425        Self {
426            account_inconsistencies: metrics::counter!(
427                "db.repair_trie.inconsistencies_found",
428                "type" => "account"
429            ),
430            storage_inconsistencies: metrics::counter!(
431                "db.repair_trie.inconsistencies_found",
432                "type" => "storage"
433            ),
434        }
435    }
436}