reth_cli_commands/db/checksum/
mod.rs1use crate::{
2 common::CliNodeTypes,
3 db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11 cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12 Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::{ChangesetOffset, StaticFileSegment};
18use std::{
19 hash::{BuildHasher, Hasher},
20 time::{Duration, Instant},
21};
22use tracing::{info, warn};
23
24mod rocksdb;
25
26const PROGRESS_LOG_INTERVAL: usize = 100_000;
28
29#[derive(Parser, Debug)]
30pub struct Command {
32 #[command(subcommand)]
33 subcommand: Subcommand,
34}
35
36#[derive(clap::Subcommand, Debug)]
37enum Subcommand {
38 Mdbx {
40 table: Tables,
42
43 #[arg(long, value_parser = maybe_json_value_parser)]
45 start_key: Option<String>,
46
47 #[arg(long, value_parser = maybe_json_value_parser)]
49 end_key: Option<String>,
50
51 #[arg(long)]
54 limit: Option<usize>,
55 },
56 StaticFile {
58 #[arg(value_enum)]
60 segment: StaticFileSegment,
61
62 #[arg(long)]
64 start_block: Option<u64>,
65
66 #[arg(long)]
68 end_block: Option<u64>,
69
70 #[arg(long)]
72 limit: Option<usize>,
73 },
74 Rocksdb {
76 #[arg(value_enum)]
78 table: rocksdb::RocksDbTable,
79
80 #[arg(long)]
82 limit: Option<usize>,
83 },
84}
85
86impl Command {
87 pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
89 self,
90 tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
91 ) -> eyre::Result<()> {
92 warn!("This command should be run without the node running!");
93
94 match self.subcommand {
95 Subcommand::Mdbx { table, start_key, end_key, limit } => {
96 table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
97 }
98 Subcommand::StaticFile { segment, start_block, end_block, limit } => {
99 checksum_static_file(tool, segment, start_block, end_block, limit)?;
100 }
101 Subcommand::Rocksdb { table, limit } => {
102 rocksdb::checksum_rocksdb(tool, table, limit)?;
103 }
104 }
105
106 Ok(())
107 }
108}
109
110fn checksum_hasher() -> impl Hasher {
112 FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
113}
114
115fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
116 tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
117 segment: StaticFileSegment,
118 start_block: Option<u64>,
119 end_block: Option<u64>,
120 limit: Option<usize>,
121) -> eyre::Result<()> {
122 let static_file_provider = tool.provider_factory.static_file_provider();
123 if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
124 warn!("Error checking consistency of static files: {err}");
125 }
126
127 let static_files = iter_static_files(static_file_provider.directory())?;
128
129 let ranges = static_files
130 .get(segment)
131 .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
132
133 let start_time = Instant::now();
134 let limit = limit.unwrap_or(usize::MAX);
135 let mut checksummer = Checksummer::new(checksum_hasher(), limit);
136
137 let start_block = start_block.unwrap_or(0);
138 let end_block = end_block.unwrap_or(u64::MAX);
139 let is_change_based = segment.is_change_based();
140
141 info!(
142 "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
143 segment,
144 start_block,
145 end_block,
146 if limit == usize::MAX { None } else { Some(limit) }
147 );
148
149 let mut reached_limit = false;
150 for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
151 if block_range.end() < start_block || block_range.start() > end_block {
152 continue;
153 }
154
155 let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
156 let jar_provider = static_file_provider
157 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
158 .ok_or_else(|| {
159 eyre::eyre!(
160 "Failed to get segment provider for segment {} at range {}",
161 segment,
162 block_range
163 )
164 })?;
165
166 let mut cursor = jar_provider.cursor()?;
167
168 if is_change_based {
169 let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
170 eyre::eyre!(
171 "Missing changeset offsets sidecar for segment {} at range {}",
172 segment,
173 block_range
174 )
175 })?;
176 let input = ChangeBasedChecksumInput {
177 segment,
178 block_range_start: block_range.start(),
179 start_block,
180 end_block,
181 offsets: &offsets,
182 };
183
184 reached_limit = checksum_change_based_segment(&mut checksummer, input, &mut cursor)?;
185 } else {
186 while let Some(row) = cursor.next_row()? {
187 if checksummer.write_row(&row) {
188 reached_limit = true;
189 break;
190 }
191 }
192 }
193
194 drop(jar_provider);
196 static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
197
198 if reached_limit {
199 break;
200 }
201 }
202
203 let (checksum, total) = checksummer.finish();
204 let elapsed = start_time.elapsed();
205
206 info!(
207 "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
208 segment, checksum, total, elapsed
209 );
210
211 Ok(())
212}
213
214pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
215 tool: &'a DbTool<N>,
216 start_key: Option<String>,
217 end_key: Option<String>,
218 limit: Option<usize>,
219}
220
221impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
222 pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
223 ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
224 }
225}
226
227impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
228 type Error = eyre::Report;
229
230 fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
231 let provider =
232 self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
233 let tx = provider.tx_ref();
234 info!(
235 "Start computing checksum, start={:?}, end={:?}, limit={:?}",
236 self.start_key, self.end_key, self.limit
237 );
238
239 let mut cursor = tx.cursor_read::<RawTable<T>>()?;
240 let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
241 (Some(start), Some(end)) => {
242 let start_key = table_key::<T>(start).map(RawKey::new)?;
243 let end_key = table_key::<T>(end).map(RawKey::new)?;
244 cursor.walk_range(start_key..=end_key)?
245 }
246 (None, Some(end)) => {
247 let end_key = table_key::<T>(end).map(RawKey::new)?;
248
249 cursor.walk_range(..=end_key)?
250 }
251 (Some(start), None) => {
252 let start_key = table_key::<T>(start).map(RawKey::new)?;
253 cursor.walk_range(start_key..)?
254 }
255 (None, None) => cursor.walk_range(..)?,
256 };
257
258 let start_time = Instant::now();
259 let mut hasher = checksum_hasher();
260 let mut total = 0;
261
262 let limit = self.limit.unwrap_or(usize::MAX);
263 let mut enumerate_start_key = None;
264 let mut enumerate_end_key = None;
265 for (index, entry) in walker.enumerate() {
266 let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
267
268 if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
269 info!("Hashed {index} entries.");
270 }
271
272 hasher.write(k.raw_key());
273 hasher.write(v.raw_value());
274
275 if enumerate_start_key.is_none() {
276 enumerate_start_key = Some(k.clone());
277 }
278 enumerate_end_key = Some(k);
279
280 total = index + 1;
281 if total >= limit {
282 break;
283 }
284 }
285
286 info!("Hashed {total} entries.");
287 if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
288 info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
289 info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
290 }
291
292 let checksum = hasher.finish();
293 let elapsed = start_time.elapsed();
294
295 info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
296
297 Ok((checksum, elapsed))
298 }
299}
300
301struct Checksummer<H> {
303 hasher: H,
304 total: usize,
305 limit: usize,
306}
307
308impl<H: Hasher> Checksummer<H> {
309 fn new(hasher: H, limit: usize) -> Self {
310 Self { hasher, total: 0, limit }
311 }
312
313 fn write_row(&mut self, row: &[&[u8]]) -> bool {
315 for col in row {
316 self.hasher.write(col);
317 }
318 self.advance()
319 }
320
321 fn write_entry(&mut self, key: &[u8], value: &[u8]) -> bool {
324 self.hasher.write(key);
325 self.hasher.write(value);
326 self.advance()
327 }
328
329 fn advance(&mut self) -> bool {
330 self.total += 1;
331 if self.total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
332 info!("Hashed {} entries.", self.total);
333 }
334 self.total >= self.limit
335 }
336
337 fn finish(self) -> (u64, usize) {
338 (self.hasher.finish(), self.total)
339 }
340}
341
342fn split_storage_changeset_row(block_number: u64, row: &[u8]) -> eyre::Result<([u8; 28], &[u8])> {
351 if row.len() < 20 {
352 return Err(eyre::eyre!(
353 "Storage changeset row too short: expected at least 20 bytes, got {}",
354 row.len()
355 ));
356 }
357
358 let mut key_buf = [0u8; 28];
359 key_buf[..8].copy_from_slice(&block_number.to_be_bytes());
360 key_buf[8..].copy_from_slice(&row[..20]);
361 Ok((key_buf, &row[20..]))
362}
363
364struct ChangeBasedChecksumInput<'a> {
365 segment: StaticFileSegment,
366 block_range_start: u64,
367 start_block: u64,
368 end_block: u64,
369 offsets: &'a [ChangesetOffset],
370}
371
372fn checksum_change_based_segment<H: Hasher>(
373 checksummer: &mut Checksummer<H>,
374 input: ChangeBasedChecksumInput<'_>,
375 cursor: &mut reth_db::static_file::StaticFileCursor<'_>,
376) -> eyre::Result<bool> {
377 let ChangeBasedChecksumInput { segment, block_range_start, start_block, end_block, offsets } =
378 input;
379 let is_storage = segment.is_storage_change_sets();
380 let mut reached_limit = false;
381
382 for (offset_index, offset) in offsets.iter().enumerate() {
383 let block_number = block_range_start + offset_index as u64;
384 let include = block_number >= start_block && block_number <= end_block;
385
386 for _ in 0..offset.num_changes() {
387 let row = cursor.next_row()?.ok_or_else(|| {
388 eyre::eyre!(
389 "Unexpected EOF while checksumming {} static file at range starting {}",
390 segment,
391 block_range_start
392 )
393 })?;
394
395 if !include {
396 continue;
397 }
398
399 let done = if is_storage {
403 let col = row[0];
407 let (key, value) = split_storage_changeset_row(block_number, col)?;
408 checksummer.write_entry(&key, value)
409 } else {
410 checksummer.write_entry(&block_number.to_be_bytes(), row[0])
413 };
414
415 if done {
416 reached_limit = true;
417 break;
418 }
419 }
420
421 if reached_limit {
422 break;
423 }
424 }
425
426 if !reached_limit && cursor.next_row()?.is_some() {
427 return Err(eyre::eyre!(
428 "Changeset offsets do not cover all rows for {} at range starting {}",
429 segment,
430 block_range_start
431 ));
432 }
433
434 Ok(reached_limit)
435}