reth_cli_commands/db/checksum/
mod.rs1use crate::{
2 common::CliNodeTypes,
3 db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11 cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12 Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::{ChangesetOffset, StaticFileSegment};
18use std::{
19 hash::{BuildHasher, Hasher},
20 time::{Duration, Instant},
21};
22use tracing::{info, warn};
23
24#[cfg(all(unix, feature = "rocksdb"))]
25mod rocksdb;
26
27const PROGRESS_LOG_INTERVAL: usize = 100_000;
29
30#[derive(Parser, Debug)]
31pub struct Command {
33 #[command(subcommand)]
34 subcommand: Subcommand,
35}
36
37#[derive(clap::Subcommand, Debug)]
38enum Subcommand {
39 Mdbx {
41 table: Tables,
43
44 #[arg(long, value_parser = maybe_json_value_parser)]
46 start_key: Option<String>,
47
48 #[arg(long, value_parser = maybe_json_value_parser)]
50 end_key: Option<String>,
51
52 #[arg(long)]
55 limit: Option<usize>,
56 },
57 StaticFile {
59 #[arg(value_enum)]
61 segment: StaticFileSegment,
62
63 #[arg(long)]
65 start_block: Option<u64>,
66
67 #[arg(long)]
69 end_block: Option<u64>,
70
71 #[arg(long)]
73 limit: Option<usize>,
74 },
75 #[cfg(all(unix, feature = "rocksdb"))]
77 Rocksdb {
78 #[arg(value_enum)]
80 table: rocksdb::RocksDbTable,
81
82 #[arg(long)]
84 limit: Option<usize>,
85 },
86}
87
88impl Command {
89 pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
91 self,
92 tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
93 ) -> eyre::Result<()> {
94 warn!("This command should be run without the node running!");
95
96 match self.subcommand {
97 Subcommand::Mdbx { table, start_key, end_key, limit } => {
98 table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
99 }
100 Subcommand::StaticFile { segment, start_block, end_block, limit } => {
101 checksum_static_file(tool, segment, start_block, end_block, limit)?;
102 }
103 #[cfg(all(unix, feature = "rocksdb"))]
104 Subcommand::Rocksdb { table, limit } => {
105 rocksdb::checksum_rocksdb(tool, table, limit)?;
106 }
107 }
108
109 Ok(())
110 }
111}
112
113fn checksum_hasher() -> impl Hasher {
115 FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
116}
117
118fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
119 tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
120 segment: StaticFileSegment,
121 start_block: Option<u64>,
122 end_block: Option<u64>,
123 limit: Option<usize>,
124) -> eyre::Result<()> {
125 let static_file_provider = tool.provider_factory.static_file_provider();
126 if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
127 warn!("Error checking consistency of static files: {err}");
128 }
129
130 let static_files = iter_static_files(static_file_provider.directory())?;
131
132 let ranges = static_files
133 .get(segment)
134 .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
135
136 let start_time = Instant::now();
137 let limit = limit.unwrap_or(usize::MAX);
138 let mut checksummer = Checksummer::new(checksum_hasher(), limit);
139
140 let start_block = start_block.unwrap_or(0);
141 let end_block = end_block.unwrap_or(u64::MAX);
142 let is_change_based = segment.is_change_based();
143
144 info!(
145 "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
146 segment,
147 start_block,
148 end_block,
149 if limit == usize::MAX { None } else { Some(limit) }
150 );
151
152 let mut reached_limit = false;
153 for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
154 if block_range.end() < start_block || block_range.start() > end_block {
155 continue;
156 }
157
158 let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
159 let jar_provider = static_file_provider
160 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
161 .ok_or_else(|| {
162 eyre::eyre!(
163 "Failed to get segment provider for segment {} at range {}",
164 segment,
165 block_range
166 )
167 })?;
168
169 let mut cursor = jar_provider.cursor()?;
170
171 if is_change_based {
172 let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
173 eyre::eyre!(
174 "Missing changeset offsets sidecar for segment {} at range {}",
175 segment,
176 block_range
177 )
178 })?;
179 let input = ChangeBasedChecksumInput {
180 segment,
181 block_range_start: block_range.start(),
182 start_block,
183 end_block,
184 offsets: &offsets,
185 };
186
187 reached_limit = checksum_change_based_segment(&mut checksummer, input, &mut cursor)?;
188 } else {
189 while let Some(row) = cursor.next_row()? {
190 if checksummer.write_row(&row) {
191 reached_limit = true;
192 break;
193 }
194 }
195 }
196
197 drop(jar_provider);
199 static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
200
201 if reached_limit {
202 break;
203 }
204 }
205
206 let (checksum, total) = checksummer.finish();
207 let elapsed = start_time.elapsed();
208
209 info!(
210 "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
211 segment, checksum, total, elapsed
212 );
213
214 Ok(())
215}
216
217pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
218 tool: &'a DbTool<N>,
219 start_key: Option<String>,
220 end_key: Option<String>,
221 limit: Option<usize>,
222}
223
224impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
225 pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
226 ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
227 }
228}
229
230impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
231 type Error = eyre::Report;
232
233 fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
234 let provider =
235 self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
236 let tx = provider.tx_ref();
237 info!(
238 "Start computing checksum, start={:?}, end={:?}, limit={:?}",
239 self.start_key, self.end_key, self.limit
240 );
241
242 let mut cursor = tx.cursor_read::<RawTable<T>>()?;
243 let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
244 (Some(start), Some(end)) => {
245 let start_key = table_key::<T>(start).map(RawKey::new)?;
246 let end_key = table_key::<T>(end).map(RawKey::new)?;
247 cursor.walk_range(start_key..=end_key)?
248 }
249 (None, Some(end)) => {
250 let end_key = table_key::<T>(end).map(RawKey::new)?;
251
252 cursor.walk_range(..=end_key)?
253 }
254 (Some(start), None) => {
255 let start_key = table_key::<T>(start).map(RawKey::new)?;
256 cursor.walk_range(start_key..)?
257 }
258 (None, None) => cursor.walk_range(..)?,
259 };
260
261 let start_time = Instant::now();
262 let mut hasher = checksum_hasher();
263 let mut total = 0;
264
265 let limit = self.limit.unwrap_or(usize::MAX);
266 let mut enumerate_start_key = None;
267 let mut enumerate_end_key = None;
268 for (index, entry) in walker.enumerate() {
269 let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
270
271 if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
272 info!("Hashed {index} entries.");
273 }
274
275 hasher.write(k.raw_key());
276 hasher.write(v.raw_value());
277
278 if enumerate_start_key.is_none() {
279 enumerate_start_key = Some(k.clone());
280 }
281 enumerate_end_key = Some(k);
282
283 total = index + 1;
284 if total >= limit {
285 break;
286 }
287 }
288
289 info!("Hashed {total} entries.");
290 if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
291 info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
292 info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
293 }
294
295 let checksum = hasher.finish();
296 let elapsed = start_time.elapsed();
297
298 info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
299
300 Ok((checksum, elapsed))
301 }
302}
303
304struct Checksummer<H> {
306 hasher: H,
307 total: usize,
308 limit: usize,
309}
310
311impl<H: Hasher> Checksummer<H> {
312 fn new(hasher: H, limit: usize) -> Self {
313 Self { hasher, total: 0, limit }
314 }
315
316 fn write_row(&mut self, row: &[&[u8]]) -> bool {
318 for col in row {
319 self.hasher.write(col);
320 }
321 self.advance()
322 }
323
324 fn write_entry(&mut self, key: &[u8], value: &[u8]) -> bool {
327 self.hasher.write(key);
328 self.hasher.write(value);
329 self.advance()
330 }
331
332 fn advance(&mut self) -> bool {
333 self.total += 1;
334 if self.total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
335 info!("Hashed {} entries.", self.total);
336 }
337 self.total >= self.limit
338 }
339
340 fn finish(self) -> (u64, usize) {
341 (self.hasher.finish(), self.total)
342 }
343}
344
345fn split_storage_changeset_row(block_number: u64, row: &[u8]) -> eyre::Result<([u8; 28], &[u8])> {
354 if row.len() < 20 {
355 return Err(eyre::eyre!(
356 "Storage changeset row too short: expected at least 20 bytes, got {}",
357 row.len()
358 ));
359 }
360
361 let mut key_buf = [0u8; 28];
362 key_buf[..8].copy_from_slice(&block_number.to_be_bytes());
363 key_buf[8..].copy_from_slice(&row[..20]);
364 Ok((key_buf, &row[20..]))
365}
366
367struct ChangeBasedChecksumInput<'a> {
368 segment: StaticFileSegment,
369 block_range_start: u64,
370 start_block: u64,
371 end_block: u64,
372 offsets: &'a [ChangesetOffset],
373}
374
375fn checksum_change_based_segment<H: Hasher>(
376 checksummer: &mut Checksummer<H>,
377 input: ChangeBasedChecksumInput<'_>,
378 cursor: &mut reth_db::static_file::StaticFileCursor<'_>,
379) -> eyre::Result<bool> {
380 let ChangeBasedChecksumInput { segment, block_range_start, start_block, end_block, offsets } =
381 input;
382 let is_storage = segment.is_storage_change_sets();
383 let mut reached_limit = false;
384
385 for (offset_index, offset) in offsets.iter().enumerate() {
386 let block_number = block_range_start + offset_index as u64;
387 let include = block_number >= start_block && block_number <= end_block;
388
389 for _ in 0..offset.num_changes() {
390 let row = cursor.next_row()?.ok_or_else(|| {
391 eyre::eyre!(
392 "Unexpected EOF while checksumming {} static file at range starting {}",
393 segment,
394 block_range_start
395 )
396 })?;
397
398 if !include {
399 continue;
400 }
401
402 let done = if is_storage {
406 let col = row[0];
410 let (key, value) = split_storage_changeset_row(block_number, col)?;
411 checksummer.write_entry(&key, value)
412 } else {
413 checksummer.write_entry(&block_number.to_be_bytes(), row[0])
416 };
417
418 if done {
419 reached_limit = true;
420 break;
421 }
422 }
423
424 if reached_limit {
425 break;
426 }
427 }
428
429 if !reached_limit && cursor.next_row()?.is_some() {
430 return Err(eyre::eyre!(
431 "Changeset offsets do not cover all rows for {} at range starting {}",
432 segment,
433 block_range_start
434 ));
435 }
436
437 Ok(reached_limit)
438}