Skip to main content

reth_cli_commands/db/checksum/
mod.rs

1use crate::{
2    common::CliNodeTypes,
3    db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11    cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12    Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::{ChangesetOffset, StaticFileSegment};
18use std::{
19    hash::{BuildHasher, Hasher},
20    time::{Duration, Instant},
21};
22use tracing::{info, warn};
23
24#[cfg(all(unix, feature = "rocksdb"))]
25mod rocksdb;
26
27/// Interval for logging progress during checksum computation.
28const PROGRESS_LOG_INTERVAL: usize = 100_000;
29
30#[derive(Parser, Debug)]
31/// The arguments for the `reth db checksum` command
32pub struct Command {
33    #[command(subcommand)]
34    subcommand: Subcommand,
35}
36
37#[derive(clap::Subcommand, Debug)]
38enum Subcommand {
39    /// Calculates the checksum of a database table
40    Mdbx {
41        /// The table name
42        table: Tables,
43
44        /// The start of the range to checksum.
45        #[arg(long, value_parser = maybe_json_value_parser)]
46        start_key: Option<String>,
47
48        /// The end of the range to checksum.
49        #[arg(long, value_parser = maybe_json_value_parser)]
50        end_key: Option<String>,
51
52        /// The maximum number of records that are queried and used to compute the
53        /// checksum.
54        #[arg(long)]
55        limit: Option<usize>,
56    },
57    /// Calculates the checksum of a static file segment
58    StaticFile {
59        /// The static file segment
60        #[arg(value_enum)]
61        segment: StaticFileSegment,
62
63        /// The block number to start from (inclusive).
64        #[arg(long)]
65        start_block: Option<u64>,
66
67        /// The block number to end at (inclusive).
68        #[arg(long)]
69        end_block: Option<u64>,
70
71        /// The maximum number of rows to checksum.
72        #[arg(long)]
73        limit: Option<usize>,
74    },
75    /// Calculates the checksum of a RocksDB table
76    #[cfg(all(unix, feature = "rocksdb"))]
77    Rocksdb {
78        /// The RocksDB table
79        #[arg(value_enum)]
80        table: rocksdb::RocksDbTable,
81
82        /// The maximum number of records to checksum.
83        #[arg(long)]
84        limit: Option<usize>,
85    },
86}
87
88impl Command {
89    /// Execute `db checksum` command
90    pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
91        self,
92        tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
93    ) -> eyre::Result<()> {
94        warn!("This command should be run without the node running!");
95
96        match self.subcommand {
97            Subcommand::Mdbx { table, start_key, end_key, limit } => {
98                table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
99            }
100            Subcommand::StaticFile { segment, start_block, end_block, limit } => {
101                checksum_static_file(tool, segment, start_block, end_block, limit)?;
102            }
103            #[cfg(all(unix, feature = "rocksdb"))]
104            Subcommand::Rocksdb { table, limit } => {
105                rocksdb::checksum_rocksdb(tool, table, limit)?;
106            }
107        }
108
109        Ok(())
110    }
111}
112
113/// Creates a new hasher with the standard seed used for checksum computation.
114fn checksum_hasher() -> impl Hasher {
115    FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
116}
117
118fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
119    tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
120    segment: StaticFileSegment,
121    start_block: Option<u64>,
122    end_block: Option<u64>,
123    limit: Option<usize>,
124) -> eyre::Result<()> {
125    let static_file_provider = tool.provider_factory.static_file_provider();
126    if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
127        warn!("Error checking consistency of static files: {err}");
128    }
129
130    let static_files = iter_static_files(static_file_provider.directory())?;
131
132    let ranges = static_files
133        .get(segment)
134        .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
135
136    let start_time = Instant::now();
137    let limit = limit.unwrap_or(usize::MAX);
138    let mut checksummer = Checksummer::new(checksum_hasher(), limit);
139
140    let start_block = start_block.unwrap_or(0);
141    let end_block = end_block.unwrap_or(u64::MAX);
142    let is_change_based = segment.is_change_based();
143
144    info!(
145        "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
146        segment,
147        start_block,
148        end_block,
149        if limit == usize::MAX { None } else { Some(limit) }
150    );
151
152    let mut reached_limit = false;
153    for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
154        if block_range.end() < start_block || block_range.start() > end_block {
155            continue;
156        }
157
158        let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
159        let jar_provider = static_file_provider
160            .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
161            .ok_or_else(|| {
162                eyre::eyre!(
163                    "Failed to get segment provider for segment {} at range {}",
164                    segment,
165                    block_range
166                )
167            })?;
168
169        let mut cursor = jar_provider.cursor()?;
170
171        if is_change_based {
172            let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
173                eyre::eyre!(
174                    "Missing changeset offsets sidecar for segment {} at range {}",
175                    segment,
176                    block_range
177                )
178            })?;
179            let input = ChangeBasedChecksumInput {
180                segment,
181                block_range_start: block_range.start(),
182                start_block,
183                end_block,
184                offsets: &offsets,
185            };
186
187            reached_limit = checksum_change_based_segment(&mut checksummer, input, &mut cursor)?;
188        } else {
189            while let Some(row) = cursor.next_row()? {
190                if checksummer.write_row(&row) {
191                    reached_limit = true;
192                    break;
193                }
194            }
195        }
196
197        // Explicitly drop provider before removing from cache to avoid deadlock
198        drop(jar_provider);
199        static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
200
201        if reached_limit {
202            break;
203        }
204    }
205
206    let (checksum, total) = checksummer.finish();
207    let elapsed = start_time.elapsed();
208
209    info!(
210        "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
211        segment, checksum, total, elapsed
212    );
213
214    Ok(())
215}
216
217pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
218    tool: &'a DbTool<N>,
219    start_key: Option<String>,
220    end_key: Option<String>,
221    limit: Option<usize>,
222}
223
224impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
225    pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
226        ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
227    }
228}
229
230impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
231    type Error = eyre::Report;
232
233    fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
234        let provider =
235            self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
236        let tx = provider.tx_ref();
237        info!(
238            "Start computing checksum, start={:?}, end={:?}, limit={:?}",
239            self.start_key, self.end_key, self.limit
240        );
241
242        let mut cursor = tx.cursor_read::<RawTable<T>>()?;
243        let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
244            (Some(start), Some(end)) => {
245                let start_key = table_key::<T>(start).map(RawKey::new)?;
246                let end_key = table_key::<T>(end).map(RawKey::new)?;
247                cursor.walk_range(start_key..=end_key)?
248            }
249            (None, Some(end)) => {
250                let end_key = table_key::<T>(end).map(RawKey::new)?;
251
252                cursor.walk_range(..=end_key)?
253            }
254            (Some(start), None) => {
255                let start_key = table_key::<T>(start).map(RawKey::new)?;
256                cursor.walk_range(start_key..)?
257            }
258            (None, None) => cursor.walk_range(..)?,
259        };
260
261        let start_time = Instant::now();
262        let mut hasher = checksum_hasher();
263        let mut total = 0;
264
265        let limit = self.limit.unwrap_or(usize::MAX);
266        let mut enumerate_start_key = None;
267        let mut enumerate_end_key = None;
268        for (index, entry) in walker.enumerate() {
269            let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
270
271            if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
272                info!("Hashed {index} entries.");
273            }
274
275            hasher.write(k.raw_key());
276            hasher.write(v.raw_value());
277
278            if enumerate_start_key.is_none() {
279                enumerate_start_key = Some(k.clone());
280            }
281            enumerate_end_key = Some(k);
282
283            total = index + 1;
284            if total >= limit {
285                break;
286            }
287        }
288
289        info!("Hashed {total} entries.");
290        if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
291            info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
292            info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
293        }
294
295        let checksum = hasher.finish();
296        let elapsed = start_time.elapsed();
297
298        info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
299
300        Ok((checksum, elapsed))
301    }
302}
303
304/// Accumulates a checksum over key-value entries, tracking count and limit.
305struct Checksummer<H> {
306    hasher: H,
307    total: usize,
308    limit: usize,
309}
310
311impl<H: Hasher> Checksummer<H> {
312    fn new(hasher: H, limit: usize) -> Self {
313        Self { hasher, total: 0, limit }
314    }
315
316    /// Hash a row's columns (non-changeset segments). Returns `true` if the limit is reached.
317    fn write_row(&mut self, row: &[&[u8]]) -> bool {
318        for col in row {
319            self.hasher.write(col);
320        }
321        self.advance()
322    }
323
324    /// Hash a key + value as two separate writes, matching MDBX raw entry semantics.
325    /// Write boundaries matter: foldhash rotates its accumulator by `len` on each `write`.
326    fn write_entry(&mut self, key: &[u8], value: &[u8]) -> bool {
327        self.hasher.write(key);
328        self.hasher.write(value);
329        self.advance()
330    }
331
332    fn advance(&mut self) -> bool {
333        self.total += 1;
334        if self.total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
335            info!("Hashed {} entries.", self.total);
336        }
337        self.total >= self.limit
338    }
339
340    fn finish(self) -> (u64, usize) {
341        (self.hasher.finish(), self.total)
342    }
343}
344
345/// Reconstruct MDBX `StorageChangeSets` key/value boundaries from a static-file row.
346///
347/// MDBX layout:
348/// - key: `BlockNumberAddress` => `[8B block_number][20B address]`
349/// - value: `StorageEntry` => `[32B storage_key][compact U256 value]`
350///
351/// Static-file row layout for `StorageBeforeTx`:
352/// - `[20B address][32B storage_key][compact U256 value]`
353fn split_storage_changeset_row(block_number: u64, row: &[u8]) -> eyre::Result<([u8; 28], &[u8])> {
354    if row.len() < 20 {
355        return Err(eyre::eyre!(
356            "Storage changeset row too short: expected at least 20 bytes, got {}",
357            row.len()
358        ));
359    }
360
361    let mut key_buf = [0u8; 28];
362    key_buf[..8].copy_from_slice(&block_number.to_be_bytes());
363    key_buf[8..].copy_from_slice(&row[..20]);
364    Ok((key_buf, &row[20..]))
365}
366
367struct ChangeBasedChecksumInput<'a> {
368    segment: StaticFileSegment,
369    block_range_start: u64,
370    start_block: u64,
371    end_block: u64,
372    offsets: &'a [ChangesetOffset],
373}
374
375fn checksum_change_based_segment<H: Hasher>(
376    checksummer: &mut Checksummer<H>,
377    input: ChangeBasedChecksumInput<'_>,
378    cursor: &mut reth_db::static_file::StaticFileCursor<'_>,
379) -> eyre::Result<bool> {
380    let ChangeBasedChecksumInput { segment, block_range_start, start_block, end_block, offsets } =
381        input;
382    let is_storage = segment.is_storage_change_sets();
383    let mut reached_limit = false;
384
385    for (offset_index, offset) in offsets.iter().enumerate() {
386        let block_number = block_range_start + offset_index as u64;
387        let include = block_number >= start_block && block_number <= end_block;
388
389        for _ in 0..offset.num_changes() {
390            let row = cursor.next_row()?.ok_or_else(|| {
391                eyre::eyre!(
392                    "Unexpected EOF while checksumming {} static file at range starting {}",
393                    segment,
394                    block_range_start
395                )
396            })?;
397
398            if !include {
399                continue;
400            }
401
402            // Reconstruct MDBX key/value write boundaries. foldhash rotates
403            // its accumulator by `len` on each write(), so boundaries must
404            // match exactly.
405            let done = if is_storage {
406                // StorageChangeSets: MDBX key = BlockNumberAddress (28B),
407                // value = compact StorageEntry. Column 0 is compact
408                // StorageBeforeTx = [20B address][32B key][compact U256].
409                let col = row[0];
410                let (key, value) = split_storage_changeset_row(block_number, col)?;
411                checksummer.write_entry(&key, value)
412            } else {
413                // AccountChangeSets: MDBX key = BlockNumber (8B),
414                // value = compact AccountBeforeTx (= column 0).
415                checksummer.write_entry(&block_number.to_be_bytes(), row[0])
416            };
417
418            if done {
419                reached_limit = true;
420                break;
421            }
422        }
423
424        if reached_limit {
425            break;
426        }
427    }
428
429    if !reached_limit && cursor.next_row()?.is_some() {
430        return Err(eyre::eyre!(
431            "Changeset offsets do not cover all rows for {} at range starting {}",
432            segment,
433            block_range_start
434        ));
435    }
436
437    Ok(reached_limit)
438}