reth_cli_commands/db/
repair_trie.rs

1use clap::Parser;
2use metrics::{self, Counter};
3use reth_chainspec::EthChainSpec;
4use reth_cli_util::parse_socket_address;
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
7    database::Database,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_db_common::DbTool;
12use reth_node_core::version::version_metadata;
13use reth_node_metrics::{
14    chain::ChainSpecInfo,
15    hooks::Hooks,
16    server::{MetricServer, MetricServerConfig},
17    version::VersionInfo,
18};
19use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
20use reth_stages::StageId;
21use reth_trie::{
22    verify::{Output, Verifier},
23    Nibbles,
24};
25use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
26use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
27use std::{
28    net::SocketAddr,
29    time::{Duration, Instant},
30};
31use tracing::{info, warn};
32
33const PROGRESS_PERIOD: Duration = Duration::from_secs(5);
34
35/// The arguments for the `reth db repair-trie` command
36#[derive(Parser, Debug)]
37pub struct Command {
38    /// Only show inconsistencies without making any repairs
39    #[arg(long)]
40    pub(crate) dry_run: bool,
41
42    /// Enable Prometheus metrics.
43    ///
44    /// The metrics will be served at the given interface and port.
45    #[arg(long = "metrics", value_name = "ADDR:PORT", value_parser = parse_socket_address)]
46    pub(crate) metrics: Option<SocketAddr>,
47}
48
49impl Command {
50    /// Execute `db repair-trie` command
51    pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
52        // Set up metrics server if requested
53        let _metrics_handle = if let Some(listen_addr) = self.metrics {
54            // Spawn an OS thread with a single-threaded tokio runtime for the metrics server
55            let chain_name = tool.provider_factory.chain_spec().chain().to_string();
56
57            let handle = std::thread::Builder::new().name("metrics-server".to_string()).spawn(
58                move || {
59                    // Create a single-threaded tokio runtime
60                    let runtime = tokio::runtime::Builder::new_current_thread()
61                        .enable_all()
62                        .build()
63                        .expect("Failed to create tokio runtime for metrics server");
64
65                    let handle = runtime.handle().clone();
66                    runtime.block_on(async move {
67                        let task_manager = reth_tasks::TaskManager::new(handle.clone());
68                        let task_executor = task_manager.executor();
69
70                        let config = MetricServerConfig::new(
71                            listen_addr,
72                            VersionInfo {
73                                version: version_metadata().cargo_pkg_version.as_ref(),
74                                build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
75                                cargo_features: version_metadata().vergen_cargo_features.as_ref(),
76                                git_sha: version_metadata().vergen_git_sha.as_ref(),
77                                target_triple: version_metadata()
78                                    .vergen_cargo_target_triple
79                                    .as_ref(),
80                                build_profile: version_metadata().build_profile_name.as_ref(),
81                            },
82                            ChainSpecInfo { name: chain_name },
83                            task_executor,
84                            Hooks::builder().build(),
85                        );
86
87                        // Spawn the metrics server
88                        if let Err(e) = MetricServer::new(config).serve().await {
89                            tracing::error!("Metrics server error: {}", e);
90                        }
91
92                        // Block forever to keep the runtime alive
93                        std::future::pending::<()>().await
94                    });
95                },
96            )?;
97
98            Some(handle)
99        } else {
100            None
101        };
102
103        if self.dry_run {
104            verify_only(tool)?
105        } else {
106            verify_and_repair(tool)?
107        }
108
109        Ok(())
110    }
111}
112
113fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
114    // Log the database block tip from Finish stage checkpoint
115    let finish_checkpoint = tool
116        .provider_factory
117        .provider()?
118        .get_stage_checkpoint(StageId::Finish)?
119        .unwrap_or_default();
120    info!("Database block tip: {}", finish_checkpoint.block_number);
121
122    // Get a database transaction directly from the database
123    let db = tool.provider_factory.db_ref();
124    let mut tx = db.tx()?;
125    tx.disable_long_read_transaction_safety();
126
127    // Create the verifier
128    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
129    let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
130    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
131
132    let metrics = RepairTrieMetrics::new();
133
134    let mut inconsistent_nodes = 0;
135    let start_time = Instant::now();
136    let mut last_progress_time = Instant::now();
137
138    // Iterate over the verifier and repair inconsistencies
139    for output_result in verifier {
140        let output = output_result?;
141
142        if let Output::Progress(path) = output {
143            if last_progress_time.elapsed() > PROGRESS_PERIOD {
144                output_progress(path, start_time, inconsistent_nodes);
145                last_progress_time = Instant::now();
146            }
147        } else {
148            warn!("Inconsistency found: {output:?}");
149            inconsistent_nodes += 1;
150
151            // Record metrics based on output type
152            match output {
153                Output::AccountExtra(_, _) |
154                Output::AccountWrong { .. } |
155                Output::AccountMissing(_, _) => {
156                    metrics.account_inconsistencies.increment(1);
157                }
158                Output::StorageExtra(_, _, _) |
159                Output::StorageWrong { .. } |
160                Output::StorageMissing(_, _, _) => {
161                    metrics.storage_inconsistencies.increment(1);
162                }
163                Output::Progress(_) => unreachable!(),
164            }
165        }
166    }
167
168    info!("Found {} inconsistencies (dry run - no changes made)", inconsistent_nodes);
169
170    Ok(())
171}
172
173/// Checks that the merkle stage has completed running up to the account and storage hashing stages.
174fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()> {
175    let account_hashing_checkpoint =
176        provider.get_stage_checkpoint(StageId::AccountHashing)?.unwrap_or_default();
177    let storage_hashing_checkpoint =
178        provider.get_stage_checkpoint(StageId::StorageHashing)?.unwrap_or_default();
179    let merkle_checkpoint =
180        provider.get_stage_checkpoint(StageId::MerkleExecute)?.unwrap_or_default();
181
182    if account_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
183        return Err(eyre::eyre!(
184            "MerkleExecute stage checkpoint ({}) != AccountHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
185            merkle_checkpoint.block_number,
186            account_hashing_checkpoint.block_number,
187        ))
188    }
189
190    if storage_hashing_checkpoint.block_number != merkle_checkpoint.block_number {
191        return Err(eyre::eyre!(
192            "MerkleExecute stage checkpoint ({}) != StorageHashing stage checkpoint ({}), you must first complete the pipeline sync by running `reth node`",
193            merkle_checkpoint.block_number,
194            storage_hashing_checkpoint.block_number,
195        ))
196    }
197
198    let merkle_checkpoint_progress =
199        provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?;
200    if merkle_checkpoint_progress.is_some_and(|progress| !progress.is_empty()) {
201        return Err(eyre::eyre!(
202            "MerkleExecute sync stage in-progress, you must first complete the pipeline sync by running `reth node`",
203        ))
204    }
205
206    Ok(())
207}
208
209fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
210    // Get a read-write database provider
211    let mut provider_rw = tool.provider_factory.provider_rw()?;
212
213    // Log the database block tip from Finish stage checkpoint
214    let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
215    info!("Database block tip: {}", finish_checkpoint.block_number);
216
217    // Check that a pipeline sync isn't in progress.
218    verify_checkpoints(provider_rw.as_ref())?;
219
220    // Create cursors for making modifications with
221    let tx = provider_rw.tx_mut();
222    tx.disable_long_read_transaction_safety();
223    let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
224    let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
225
226    // Create the cursor factories. These cannot accept the `&mut` tx above because they require it
227    // to be AsRef.
228    let tx = provider_rw.tx_ref();
229    let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
230    let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
231
232    // Create the verifier
233    let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
234
235    let metrics = RepairTrieMetrics::new();
236
237    let mut inconsistent_nodes = 0;
238    let start_time = Instant::now();
239    let mut last_progress_time = Instant::now();
240
241    // Iterate over the verifier and repair inconsistencies
242    for output_result in verifier {
243        let output = output_result?;
244
245        if !matches!(output, Output::Progress(_)) {
246            warn!("Inconsistency found, will repair: {output:?}");
247            inconsistent_nodes += 1;
248
249            // Record metrics based on output type
250            match &output {
251                Output::AccountExtra(_, _) |
252                Output::AccountWrong { .. } |
253                Output::AccountMissing(_, _) => {
254                    metrics.account_inconsistencies.increment(1);
255                }
256                Output::StorageExtra(_, _, _) |
257                Output::StorageWrong { .. } |
258                Output::StorageMissing(_, _, _) => {
259                    metrics.storage_inconsistencies.increment(1);
260                }
261                Output::Progress(_) => {}
262            }
263        }
264
265        match output {
266            Output::AccountExtra(path, _node) => {
267                // Extra account node in trie, remove it
268                let nibbles = StoredNibbles(path);
269                if account_trie_cursor.seek_exact(nibbles)?.is_some() {
270                    account_trie_cursor.delete_current()?;
271                }
272            }
273            Output::StorageExtra(account, path, _node) => {
274                // Extra storage node in trie, remove it
275                let nibbles = StoredNibblesSubKey(path);
276                if storage_trie_cursor
277                    .seek_by_key_subkey(account, nibbles.clone())?
278                    .filter(|e| e.nibbles == nibbles)
279                    .is_some()
280                {
281                    storage_trie_cursor.delete_current()?;
282                }
283            }
284            Output::AccountWrong { path, expected: node, .. } |
285            Output::AccountMissing(path, node) => {
286                // Wrong/missing account node value, upsert it
287                let nibbles = StoredNibbles(path);
288                account_trie_cursor.upsert(nibbles, &node)?;
289            }
290            Output::StorageWrong { account, path, expected: node, .. } |
291            Output::StorageMissing(account, path, node) => {
292                // Wrong/missing storage node value, upsert it
293                // (We can't just use `upsert` method with a dup cursor, it's not properly
294                // supported)
295                let nibbles = StoredNibblesSubKey(path);
296                let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
297                if storage_trie_cursor
298                    .seek_by_key_subkey(account, nibbles.clone())?
299                    .filter(|v| v.nibbles == nibbles)
300                    .is_some()
301                {
302                    storage_trie_cursor.delete_current()?;
303                }
304                storage_trie_cursor.upsert(account, &entry)?;
305            }
306            Output::Progress(path) => {
307                if last_progress_time.elapsed() > PROGRESS_PERIOD {
308                    output_progress(path, start_time, inconsistent_nodes);
309                    last_progress_time = Instant::now();
310                }
311            }
312        }
313    }
314
315    if inconsistent_nodes == 0 {
316        info!("No inconsistencies found");
317    } else {
318        info!("Repaired {} inconsistencies, committing changes", inconsistent_nodes);
319        provider_rw.commit()?;
320    }
321
322    Ok(())
323}
324
325/// Output progress information based on the last seen account path.
326fn output_progress(last_account: Nibbles, start_time: Instant, inconsistent_nodes: u64) {
327    // Calculate percentage based on position in the trie path space
328    // For progress estimation, we'll use the first few nibbles as an approximation
329
330    // Convert the first 16 nibbles (8 bytes) to a u64 for progress calculation
331    let mut current_value: u64 = 0;
332    let nibbles_to_use = last_account.len().min(16);
333
334    for i in 0..nibbles_to_use {
335        current_value = (current_value << 4) | (last_account.get(i).unwrap_or(0) as u64);
336    }
337    // Shift left to fill remaining bits if we have fewer than 16 nibbles
338    if nibbles_to_use < 16 {
339        current_value <<= (16 - nibbles_to_use) * 4;
340    }
341
342    let progress_percent = current_value as f64 / u64::MAX as f64 * 100.0;
343    let progress_percent_str = format!("{progress_percent:.2}");
344
345    // Calculate ETA based on current speed
346    let elapsed = start_time.elapsed();
347    let elapsed_secs = elapsed.as_secs_f64();
348
349    let estimated_total_time =
350        if progress_percent > 0.0 { elapsed_secs / (progress_percent / 100.0) } else { 0.0 };
351    let remaining_time = estimated_total_time - elapsed_secs;
352    let eta_duration = Duration::from_secs(remaining_time as u64);
353
354    info!(
355        progress_percent = progress_percent_str,
356        eta = %humantime::format_duration(eta_duration),
357        inconsistent_nodes,
358        "Repairing trie tables",
359    );
360}
361
362/// Metrics for tracking trie repair inconsistencies
363#[derive(Debug)]
364struct RepairTrieMetrics {
365    account_inconsistencies: Counter,
366    storage_inconsistencies: Counter,
367}
368
369impl RepairTrieMetrics {
370    fn new() -> Self {
371        Self {
372            account_inconsistencies: metrics::counter!(
373                "db.repair_trie.inconsistencies_found",
374                "type" => "account"
375            ),
376            storage_inconsistencies: metrics::counter!(
377                "db.repair_trie.inconsistencies_found",
378                "type" => "storage"
379            ),
380        }
381    }
382}