reth_cli_commands/db/checksum/
mod.rs

1use crate::{
2    common::CliNodeTypes,
3    db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11    cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12    Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::StaticFileSegment;
18use std::{
19    hash::{BuildHasher, Hasher},
20    sync::Arc,
21    time::{Duration, Instant},
22};
23use tracing::{info, warn};
24
25#[cfg(all(unix, feature = "edge"))]
26mod rocksdb;
27
28/// Interval for logging progress during checksum computation.
29const PROGRESS_LOG_INTERVAL: usize = 100_000;
30
31#[derive(Parser, Debug)]
32/// The arguments for the `reth db checksum` command
33pub struct Command {
34    #[command(subcommand)]
35    subcommand: Subcommand,
36}
37
38#[derive(clap::Subcommand, Debug)]
39enum Subcommand {
40    /// Calculates the checksum of a database table
41    Mdbx {
42        /// The table name
43        table: Tables,
44
45        /// The start of the range to checksum.
46        #[arg(long, value_parser = maybe_json_value_parser)]
47        start_key: Option<String>,
48
49        /// The end of the range to checksum.
50        #[arg(long, value_parser = maybe_json_value_parser)]
51        end_key: Option<String>,
52
53        /// The maximum number of records that are queried and used to compute the
54        /// checksum.
55        #[arg(long)]
56        limit: Option<usize>,
57    },
58    /// Calculates the checksum of a static file segment
59    StaticFile {
60        /// The static file segment
61        #[arg(value_enum)]
62        segment: StaticFileSegment,
63
64        /// The block number to start from (inclusive).
65        #[arg(long)]
66        start_block: Option<u64>,
67
68        /// The block number to end at (inclusive).
69        #[arg(long)]
70        end_block: Option<u64>,
71
72        /// The maximum number of rows to checksum.
73        #[arg(long)]
74        limit: Option<usize>,
75    },
76    /// Calculates the checksum of a RocksDB table
77    #[cfg(all(unix, feature = "edge"))]
78    Rocksdb {
79        /// The RocksDB table
80        #[arg(value_enum)]
81        table: rocksdb::RocksDbTable,
82
83        /// The maximum number of records to checksum.
84        #[arg(long)]
85        limit: Option<usize>,
86    },
87}
88
89impl Command {
90    /// Execute `db checksum` command
91    pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
92        self,
93        tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
94    ) -> eyre::Result<()> {
95        warn!("This command should be run without the node running!");
96
97        match self.subcommand {
98            Subcommand::Mdbx { table, start_key, end_key, limit } => {
99                table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
100            }
101            Subcommand::StaticFile { segment, start_block, end_block, limit } => {
102                checksum_static_file(tool, segment, start_block, end_block, limit)?;
103            }
104            #[cfg(all(unix, feature = "edge"))]
105            Subcommand::Rocksdb { table, limit } => {
106                rocksdb::checksum_rocksdb(tool, table, limit)?;
107            }
108        }
109
110        Ok(())
111    }
112}
113
114/// Creates a new hasher with the standard seed used for checksum computation.
115fn checksum_hasher() -> impl Hasher {
116    FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
117}
118
119fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
120    tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
121    segment: StaticFileSegment,
122    start_block: Option<u64>,
123    end_block: Option<u64>,
124    limit: Option<usize>,
125) -> eyre::Result<()> {
126    let static_file_provider = tool.provider_factory.static_file_provider();
127    if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
128        warn!("Error checking consistency of static files: {err}");
129    }
130
131    let static_files = iter_static_files(static_file_provider.directory())?;
132
133    let ranges = static_files
134        .get(segment)
135        .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
136
137    let start_time = Instant::now();
138    let mut hasher = checksum_hasher();
139    let mut total = 0usize;
140    let limit = limit.unwrap_or(usize::MAX);
141
142    let start_block = start_block.unwrap_or(0);
143    let end_block = end_block.unwrap_or(u64::MAX);
144
145    info!(
146        "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
147        segment,
148        start_block,
149        end_block,
150        if limit == usize::MAX { None } else { Some(limit) }
151    );
152
153    'outer: for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
154        if block_range.end() < start_block || block_range.start() > end_block {
155            continue;
156        }
157
158        let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
159        let jar_provider = static_file_provider
160            .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
161            .ok_or_else(|| {
162                eyre::eyre!(
163                    "Failed to get segment provider for segment {} at range {}",
164                    segment,
165                    block_range
166                )
167            })?;
168
169        let mut cursor = jar_provider.cursor()?;
170
171        while let Ok(Some(row)) = cursor.next_row() {
172            for col_data in row.iter() {
173                hasher.write(col_data);
174            }
175
176            total += 1;
177
178            if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
179                info!("Hashed {total} entries.");
180            }
181
182            if total >= limit {
183                break 'outer;
184            }
185        }
186
187        // Explicitly drop provider before removing from cache to avoid deadlock
188        drop(jar_provider);
189        static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
190    }
191
192    let checksum = hasher.finish();
193    let elapsed = start_time.elapsed();
194
195    info!(
196        "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
197        segment, checksum, total, elapsed
198    );
199
200    Ok(())
201}
202
203pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
204    tool: &'a DbTool<N>,
205    start_key: Option<String>,
206    end_key: Option<String>,
207    limit: Option<usize>,
208}
209
210impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
211    pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
212        ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
213    }
214}
215
216impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
217    type Error = eyre::Report;
218
219    fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
220        let provider =
221            self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
222        let tx = provider.tx_ref();
223        info!(
224            "Start computing checksum, start={:?}, end={:?}, limit={:?}",
225            self.start_key, self.end_key, self.limit
226        );
227
228        let mut cursor = tx.cursor_read::<RawTable<T>>()?;
229        let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
230            (Some(start), Some(end)) => {
231                let start_key = table_key::<T>(start).map(RawKey::new)?;
232                let end_key = table_key::<T>(end).map(RawKey::new)?;
233                cursor.walk_range(start_key..=end_key)?
234            }
235            (None, Some(end)) => {
236                let end_key = table_key::<T>(end).map(RawKey::new)?;
237
238                cursor.walk_range(..=end_key)?
239            }
240            (Some(start), None) => {
241                let start_key = table_key::<T>(start).map(RawKey::new)?;
242                cursor.walk_range(start_key..)?
243            }
244            (None, None) => cursor.walk_range(..)?,
245        };
246
247        let start_time = Instant::now();
248        let mut hasher = checksum_hasher();
249        let mut total = 0;
250
251        let limit = self.limit.unwrap_or(usize::MAX);
252        let mut enumerate_start_key = None;
253        let mut enumerate_end_key = None;
254        for (index, entry) in walker.enumerate() {
255            let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
256
257            if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
258                info!("Hashed {index} entries.");
259            }
260
261            hasher.write(k.raw_key());
262            hasher.write(v.raw_value());
263
264            if enumerate_start_key.is_none() {
265                enumerate_start_key = Some(k.clone());
266            }
267            enumerate_end_key = Some(k);
268
269            total = index + 1;
270            if total >= limit {
271                break
272            }
273        }
274
275        info!("Hashed {total} entries.");
276        if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
277            info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
278            info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
279        }
280
281        let checksum = hasher.finish();
282        let elapsed = start_time.elapsed();
283
284        info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
285
286        Ok((checksum, elapsed))
287    }
288}