Skip to main content

reth_cli_commands/db/checksum/
mod.rs

1use crate::{
2    common::CliNodeTypes,
3    db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11    cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12    Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::{ChangesetOffset, StaticFileSegment};
18use std::{
19    hash::{BuildHasher, Hasher},
20    time::{Duration, Instant},
21};
22use tracing::{info, warn};
23
24mod rocksdb;
25
26/// Interval for logging progress during checksum computation.
27const PROGRESS_LOG_INTERVAL: usize = 100_000;
28
29#[derive(Parser, Debug)]
30/// The arguments for the `reth db checksum` command
31pub struct Command {
32    #[command(subcommand)]
33    subcommand: Subcommand,
34}
35
36#[derive(clap::Subcommand, Debug)]
37enum Subcommand {
38    /// Calculates the checksum of a database table
39    Mdbx {
40        /// The table name
41        table: Tables,
42
43        /// The start of the range to checksum.
44        #[arg(long, value_parser = maybe_json_value_parser)]
45        start_key: Option<String>,
46
47        /// The end of the range to checksum.
48        #[arg(long, value_parser = maybe_json_value_parser)]
49        end_key: Option<String>,
50
51        /// The maximum number of records that are queried and used to compute the
52        /// checksum.
53        #[arg(long)]
54        limit: Option<usize>,
55    },
56    /// Calculates the checksum of a static file segment
57    StaticFile {
58        /// The static file segment
59        #[arg(value_enum)]
60        segment: StaticFileSegment,
61
62        /// The block number to start from (inclusive).
63        #[arg(long)]
64        start_block: Option<u64>,
65
66        /// The block number to end at (inclusive).
67        #[arg(long)]
68        end_block: Option<u64>,
69
70        /// The maximum number of rows to checksum.
71        #[arg(long)]
72        limit: Option<usize>,
73    },
74    /// Calculates the checksum of a RocksDB table
75    Rocksdb {
76        /// The RocksDB table
77        #[arg(value_enum)]
78        table: rocksdb::RocksDbTable,
79
80        /// The maximum number of records to checksum.
81        #[arg(long)]
82        limit: Option<usize>,
83    },
84}
85
86impl Command {
87    /// Execute `db checksum` command
88    pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
89        self,
90        tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
91    ) -> eyre::Result<()> {
92        warn!("This command should be run without the node running!");
93
94        match self.subcommand {
95            Subcommand::Mdbx { table, start_key, end_key, limit } => {
96                table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
97            }
98            Subcommand::StaticFile { segment, start_block, end_block, limit } => {
99                checksum_static_file(tool, segment, start_block, end_block, limit)?;
100            }
101            Subcommand::Rocksdb { table, limit } => {
102                rocksdb::checksum_rocksdb(tool, table, limit)?;
103            }
104        }
105
106        Ok(())
107    }
108}
109
110/// Creates a new hasher with the standard seed used for checksum computation.
111fn checksum_hasher() -> impl Hasher {
112    FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
113}
114
115fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
116    tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
117    segment: StaticFileSegment,
118    start_block: Option<u64>,
119    end_block: Option<u64>,
120    limit: Option<usize>,
121) -> eyre::Result<()> {
122    let static_file_provider = tool.provider_factory.static_file_provider();
123    if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
124        warn!("Error checking consistency of static files: {err}");
125    }
126
127    let static_files = iter_static_files(static_file_provider.directory())?;
128
129    let ranges = static_files
130        .get(segment)
131        .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
132
133    let start_time = Instant::now();
134    let limit = limit.unwrap_or(usize::MAX);
135    let mut checksummer = Checksummer::new(checksum_hasher(), limit);
136
137    let start_block = start_block.unwrap_or(0);
138    let end_block = end_block.unwrap_or(u64::MAX);
139    let is_change_based = segment.is_change_based();
140
141    info!(
142        "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
143        segment,
144        start_block,
145        end_block,
146        if limit == usize::MAX { None } else { Some(limit) }
147    );
148
149    let mut reached_limit = false;
150    for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
151        if block_range.end() < start_block || block_range.start() > end_block {
152            continue;
153        }
154
155        let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
156        let jar_provider = static_file_provider
157            .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
158            .ok_or_else(|| {
159                eyre::eyre!(
160                    "Failed to get segment provider for segment {} at range {}",
161                    segment,
162                    block_range
163                )
164            })?;
165
166        let mut cursor = jar_provider.cursor()?;
167
168        if is_change_based {
169            let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
170                eyre::eyre!(
171                    "Missing changeset offsets sidecar for segment {} at range {}",
172                    segment,
173                    block_range
174                )
175            })?;
176            let input = ChangeBasedChecksumInput {
177                segment,
178                block_range_start: block_range.start(),
179                start_block,
180                end_block,
181                offsets: &offsets,
182            };
183
184            reached_limit = checksum_change_based_segment(&mut checksummer, input, &mut cursor)?;
185        } else {
186            while let Some(row) = cursor.next_row()? {
187                if checksummer.write_row(&row) {
188                    reached_limit = true;
189                    break;
190                }
191            }
192        }
193
194        // Explicitly drop provider before removing from cache to avoid deadlock
195        drop(jar_provider);
196        static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
197
198        if reached_limit {
199            break;
200        }
201    }
202
203    let (checksum, total) = checksummer.finish();
204    let elapsed = start_time.elapsed();
205
206    info!(
207        "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
208        segment, checksum, total, elapsed
209    );
210
211    Ok(())
212}
213
214pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
215    tool: &'a DbTool<N>,
216    start_key: Option<String>,
217    end_key: Option<String>,
218    limit: Option<usize>,
219}
220
221impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
222    pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
223        ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
224    }
225}
226
227impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
228    type Error = eyre::Report;
229
230    fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
231        let provider =
232            self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
233        let tx = provider.tx_ref();
234        info!(
235            "Start computing checksum, start={:?}, end={:?}, limit={:?}",
236            self.start_key, self.end_key, self.limit
237        );
238
239        let mut cursor = tx.cursor_read::<RawTable<T>>()?;
240        let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
241            (Some(start), Some(end)) => {
242                let start_key = table_key::<T>(start).map(RawKey::new)?;
243                let end_key = table_key::<T>(end).map(RawKey::new)?;
244                cursor.walk_range(start_key..=end_key)?
245            }
246            (None, Some(end)) => {
247                let end_key = table_key::<T>(end).map(RawKey::new)?;
248
249                cursor.walk_range(..=end_key)?
250            }
251            (Some(start), None) => {
252                let start_key = table_key::<T>(start).map(RawKey::new)?;
253                cursor.walk_range(start_key..)?
254            }
255            (None, None) => cursor.walk_range(..)?,
256        };
257
258        let start_time = Instant::now();
259        let mut hasher = checksum_hasher();
260        let mut total = 0;
261
262        let limit = self.limit.unwrap_or(usize::MAX);
263        let mut enumerate_start_key = None;
264        let mut enumerate_end_key = None;
265        for (index, entry) in walker.enumerate() {
266            let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
267
268            if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
269                info!("Hashed {index} entries.");
270            }
271
272            hasher.write(k.raw_key());
273            hasher.write(v.raw_value());
274
275            if enumerate_start_key.is_none() {
276                enumerate_start_key = Some(k.clone());
277            }
278            enumerate_end_key = Some(k);
279
280            total = index + 1;
281            if total >= limit {
282                break;
283            }
284        }
285
286        info!("Hashed {total} entries.");
287        if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
288            info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
289            info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
290        }
291
292        let checksum = hasher.finish();
293        let elapsed = start_time.elapsed();
294
295        info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
296
297        Ok((checksum, elapsed))
298    }
299}
300
301/// Accumulates a checksum over key-value entries, tracking count and limit.
302struct Checksummer<H> {
303    hasher: H,
304    total: usize,
305    limit: usize,
306}
307
308impl<H: Hasher> Checksummer<H> {
309    fn new(hasher: H, limit: usize) -> Self {
310        Self { hasher, total: 0, limit }
311    }
312
313    /// Hash a row's columns (non-changeset segments). Returns `true` if the limit is reached.
314    fn write_row(&mut self, row: &[&[u8]]) -> bool {
315        for col in row {
316            self.hasher.write(col);
317        }
318        self.advance()
319    }
320
321    /// Hash a key + value as two separate writes, matching MDBX raw entry semantics.
322    /// Write boundaries matter: foldhash rotates its accumulator by `len` on each `write`.
323    fn write_entry(&mut self, key: &[u8], value: &[u8]) -> bool {
324        self.hasher.write(key);
325        self.hasher.write(value);
326        self.advance()
327    }
328
329    fn advance(&mut self) -> bool {
330        self.total += 1;
331        if self.total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
332            info!("Hashed {} entries.", self.total);
333        }
334        self.total >= self.limit
335    }
336
337    fn finish(self) -> (u64, usize) {
338        (self.hasher.finish(), self.total)
339    }
340}
341
342/// Reconstruct MDBX `StorageChangeSets` key/value boundaries from a static-file row.
343///
344/// MDBX layout:
345/// - key: `BlockNumberAddress` => `[8B block_number][20B address]`
346/// - value: `StorageEntry` => `[32B storage_key][compact U256 value]`
347///
348/// Static-file row layout for `StorageBeforeTx`:
349/// - `[20B address][32B storage_key][compact U256 value]`
350fn split_storage_changeset_row(block_number: u64, row: &[u8]) -> eyre::Result<([u8; 28], &[u8])> {
351    if row.len() < 20 {
352        return Err(eyre::eyre!(
353            "Storage changeset row too short: expected at least 20 bytes, got {}",
354            row.len()
355        ));
356    }
357
358    let mut key_buf = [0u8; 28];
359    key_buf[..8].copy_from_slice(&block_number.to_be_bytes());
360    key_buf[8..].copy_from_slice(&row[..20]);
361    Ok((key_buf, &row[20..]))
362}
363
364struct ChangeBasedChecksumInput<'a> {
365    segment: StaticFileSegment,
366    block_range_start: u64,
367    start_block: u64,
368    end_block: u64,
369    offsets: &'a [ChangesetOffset],
370}
371
372fn checksum_change_based_segment<H: Hasher>(
373    checksummer: &mut Checksummer<H>,
374    input: ChangeBasedChecksumInput<'_>,
375    cursor: &mut reth_db::static_file::StaticFileCursor<'_>,
376) -> eyre::Result<bool> {
377    let ChangeBasedChecksumInput { segment, block_range_start, start_block, end_block, offsets } =
378        input;
379    let is_storage = segment.is_storage_change_sets();
380    let mut reached_limit = false;
381
382    for (offset_index, offset) in offsets.iter().enumerate() {
383        let block_number = block_range_start + offset_index as u64;
384        let include = block_number >= start_block && block_number <= end_block;
385
386        for _ in 0..offset.num_changes() {
387            let row = cursor.next_row()?.ok_or_else(|| {
388                eyre::eyre!(
389                    "Unexpected EOF while checksumming {} static file at range starting {}",
390                    segment,
391                    block_range_start
392                )
393            })?;
394
395            if !include {
396                continue;
397            }
398
399            // Reconstruct MDBX key/value write boundaries. foldhash rotates
400            // its accumulator by `len` on each write(), so boundaries must
401            // match exactly.
402            let done = if is_storage {
403                // StorageChangeSets: MDBX key = BlockNumberAddress (28B),
404                // value = compact StorageEntry. Column 0 is compact
405                // StorageBeforeTx = [20B address][32B key][compact U256].
406                let col = row[0];
407                let (key, value) = split_storage_changeset_row(block_number, col)?;
408                checksummer.write_entry(&key, value)
409            } else {
410                // AccountChangeSets: MDBX key = BlockNumber (8B),
411                // value = compact AccountBeforeTx (= column 0).
412                checksummer.write_entry(&block_number.to_be_bytes(), row[0])
413            };
414
415            if done {
416                reached_limit = true;
417                break;
418            }
419        }
420
421        if reached_limit {
422            break;
423        }
424    }
425
426    if !reached_limit && cursor.next_row()?.is_some() {
427        return Err(eyre::eyre!(
428            "Changeset offsets do not cover all rows for {} at range starting {}",
429            segment,
430            block_range_start
431        ));
432    }
433
434    Ok(reached_limit)
435}