Skip to main content

reth_cli_commands/db/checksum/
mod.rs

1use crate::{
2    common::CliNodeTypes,
3    db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11    cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12    Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::StaticFileSegment;
18use std::{
19    hash::{BuildHasher, Hasher},
20    time::{Duration, Instant},
21};
22use tracing::{info, warn};
23
24#[cfg(all(unix, feature = "rocksdb"))]
25mod rocksdb;
26
27/// Interval for logging progress during checksum computation.
28const PROGRESS_LOG_INTERVAL: usize = 100_000;
29
30#[derive(Parser, Debug)]
31/// The arguments for the `reth db checksum` command
32pub struct Command {
33    #[command(subcommand)]
34    subcommand: Subcommand,
35}
36
37#[derive(clap::Subcommand, Debug)]
38enum Subcommand {
39    /// Calculates the checksum of a database table
40    Mdbx {
41        /// The table name
42        table: Tables,
43
44        /// The start of the range to checksum.
45        #[arg(long, value_parser = maybe_json_value_parser)]
46        start_key: Option<String>,
47
48        /// The end of the range to checksum.
49        #[arg(long, value_parser = maybe_json_value_parser)]
50        end_key: Option<String>,
51
52        /// The maximum number of records that are queried and used to compute the
53        /// checksum.
54        #[arg(long)]
55        limit: Option<usize>,
56    },
57    /// Calculates the checksum of a static file segment
58    StaticFile {
59        /// The static file segment
60        #[arg(value_enum)]
61        segment: StaticFileSegment,
62
63        /// The block number to start from (inclusive).
64        #[arg(long)]
65        start_block: Option<u64>,
66
67        /// The block number to end at (inclusive).
68        #[arg(long)]
69        end_block: Option<u64>,
70
71        /// The maximum number of rows to checksum.
72        #[arg(long)]
73        limit: Option<usize>,
74    },
75    /// Calculates the checksum of a RocksDB table
76    #[cfg(all(unix, feature = "rocksdb"))]
77    Rocksdb {
78        /// The RocksDB table
79        #[arg(value_enum)]
80        table: rocksdb::RocksDbTable,
81
82        /// The maximum number of records to checksum.
83        #[arg(long)]
84        limit: Option<usize>,
85    },
86}
87
88impl Command {
89    /// Execute `db checksum` command
90    pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
91        self,
92        tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
93    ) -> eyre::Result<()> {
94        warn!("This command should be run without the node running!");
95
96        match self.subcommand {
97            Subcommand::Mdbx { table, start_key, end_key, limit } => {
98                table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
99            }
100            Subcommand::StaticFile { segment, start_block, end_block, limit } => {
101                checksum_static_file(tool, segment, start_block, end_block, limit)?;
102            }
103            #[cfg(all(unix, feature = "rocksdb"))]
104            Subcommand::Rocksdb { table, limit } => {
105                rocksdb::checksum_rocksdb(tool, table, limit)?;
106            }
107        }
108
109        Ok(())
110    }
111}
112
113/// Creates a new hasher with the standard seed used for checksum computation.
114fn checksum_hasher() -> impl Hasher {
115    FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
116}
117
118fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
119    tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
120    segment: StaticFileSegment,
121    start_block: Option<u64>,
122    end_block: Option<u64>,
123    limit: Option<usize>,
124) -> eyre::Result<()> {
125    let static_file_provider = tool.provider_factory.static_file_provider();
126    if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
127        warn!("Error checking consistency of static files: {err}");
128    }
129
130    let static_files = iter_static_files(static_file_provider.directory())?;
131
132    let ranges = static_files
133        .get(segment)
134        .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
135
136    let start_time = Instant::now();
137    let mut hasher = checksum_hasher();
138    let mut total = 0usize;
139    let limit = limit.unwrap_or(usize::MAX);
140
141    let start_block = start_block.unwrap_or(0);
142    let end_block = end_block.unwrap_or(u64::MAX);
143
144    info!(
145        "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
146        segment,
147        start_block,
148        end_block,
149        if limit == usize::MAX { None } else { Some(limit) }
150    );
151
152    'outer: for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
153        if block_range.end() < start_block || block_range.start() > end_block {
154            continue;
155        }
156
157        let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
158        let jar_provider = static_file_provider
159            .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
160            .ok_or_else(|| {
161                eyre::eyre!(
162                    "Failed to get segment provider for segment {} at range {}",
163                    segment,
164                    block_range
165                )
166            })?;
167
168        let mut cursor = jar_provider.cursor()?;
169
170        while let Ok(Some(row)) = cursor.next_row() {
171            for col_data in row.iter() {
172                hasher.write(col_data);
173            }
174
175            total += 1;
176
177            if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
178                info!("Hashed {total} entries.");
179            }
180
181            if total >= limit {
182                break 'outer;
183            }
184        }
185
186        // Explicitly drop provider before removing from cache to avoid deadlock
187        drop(jar_provider);
188        static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
189    }
190
191    let checksum = hasher.finish();
192    let elapsed = start_time.elapsed();
193
194    info!(
195        "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
196        segment, checksum, total, elapsed
197    );
198
199    Ok(())
200}
201
202pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
203    tool: &'a DbTool<N>,
204    start_key: Option<String>,
205    end_key: Option<String>,
206    limit: Option<usize>,
207}
208
209impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
210    pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
211        ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
212    }
213}
214
215impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
216    type Error = eyre::Report;
217
218    fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
219        let provider =
220            self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
221        let tx = provider.tx_ref();
222        info!(
223            "Start computing checksum, start={:?}, end={:?}, limit={:?}",
224            self.start_key, self.end_key, self.limit
225        );
226
227        let mut cursor = tx.cursor_read::<RawTable<T>>()?;
228        let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
229            (Some(start), Some(end)) => {
230                let start_key = table_key::<T>(start).map(RawKey::new)?;
231                let end_key = table_key::<T>(end).map(RawKey::new)?;
232                cursor.walk_range(start_key..=end_key)?
233            }
234            (None, Some(end)) => {
235                let end_key = table_key::<T>(end).map(RawKey::new)?;
236
237                cursor.walk_range(..=end_key)?
238            }
239            (Some(start), None) => {
240                let start_key = table_key::<T>(start).map(RawKey::new)?;
241                cursor.walk_range(start_key..)?
242            }
243            (None, None) => cursor.walk_range(..)?,
244        };
245
246        let start_time = Instant::now();
247        let mut hasher = checksum_hasher();
248        let mut total = 0;
249
250        let limit = self.limit.unwrap_or(usize::MAX);
251        let mut enumerate_start_key = None;
252        let mut enumerate_end_key = None;
253        for (index, entry) in walker.enumerate() {
254            let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
255
256            if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
257                info!("Hashed {index} entries.");
258            }
259
260            hasher.write(k.raw_key());
261            hasher.write(v.raw_value());
262
263            if enumerate_start_key.is_none() {
264                enumerate_start_key = Some(k.clone());
265            }
266            enumerate_end_key = Some(k);
267
268            total = index + 1;
269            if total >= limit {
270                break
271            }
272        }
273
274        info!("Hashed {total} entries.");
275        if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
276            info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
277            info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
278        }
279
280        let checksum = hasher.finish();
281        let elapsed = start_time.elapsed();
282
283        info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
284
285        Ok((checksum, elapsed))
286    }
287}