reth_cli_commands/db/checksum/
mod.rs1use crate::{
2 common::CliNodeTypes,
3 db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11 cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12 Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::StaticFileSegment;
18use std::{
19 hash::{BuildHasher, Hasher},
20 time::{Duration, Instant},
21};
22use tracing::{info, warn};
23
24#[cfg(all(unix, feature = "rocksdb"))]
25mod rocksdb;
26
27const PROGRESS_LOG_INTERVAL: usize = 100_000;
29
30#[derive(Parser, Debug)]
31pub struct Command {
33 #[command(subcommand)]
34 subcommand: Subcommand,
35}
36
37#[derive(clap::Subcommand, Debug)]
38enum Subcommand {
39 Mdbx {
41 table: Tables,
43
44 #[arg(long, value_parser = maybe_json_value_parser)]
46 start_key: Option<String>,
47
48 #[arg(long, value_parser = maybe_json_value_parser)]
50 end_key: Option<String>,
51
52 #[arg(long)]
55 limit: Option<usize>,
56 },
57 StaticFile {
59 #[arg(value_enum)]
61 segment: StaticFileSegment,
62
63 #[arg(long)]
65 start_block: Option<u64>,
66
67 #[arg(long)]
69 end_block: Option<u64>,
70
71 #[arg(long)]
73 limit: Option<usize>,
74 },
75 #[cfg(all(unix, feature = "rocksdb"))]
77 Rocksdb {
78 #[arg(value_enum)]
80 table: rocksdb::RocksDbTable,
81
82 #[arg(long)]
84 limit: Option<usize>,
85 },
86}
87
88impl Command {
89 pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
91 self,
92 tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
93 ) -> eyre::Result<()> {
94 warn!("This command should be run without the node running!");
95
96 match self.subcommand {
97 Subcommand::Mdbx { table, start_key, end_key, limit } => {
98 table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
99 }
100 Subcommand::StaticFile { segment, start_block, end_block, limit } => {
101 checksum_static_file(tool, segment, start_block, end_block, limit)?;
102 }
103 #[cfg(all(unix, feature = "rocksdb"))]
104 Subcommand::Rocksdb { table, limit } => {
105 rocksdb::checksum_rocksdb(tool, table, limit)?;
106 }
107 }
108
109 Ok(())
110 }
111}
112
113fn checksum_hasher() -> impl Hasher {
115 FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
116}
117
118fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
119 tool: &DbTool<NodeTypesWithDBAdapter<N, DatabaseEnv>>,
120 segment: StaticFileSegment,
121 start_block: Option<u64>,
122 end_block: Option<u64>,
123 limit: Option<usize>,
124) -> eyre::Result<()> {
125 let static_file_provider = tool.provider_factory.static_file_provider();
126 if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
127 warn!("Error checking consistency of static files: {err}");
128 }
129
130 let static_files = iter_static_files(static_file_provider.directory())?;
131
132 let ranges = static_files
133 .get(segment)
134 .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
135
136 let start_time = Instant::now();
137 let mut hasher = checksum_hasher();
138 let mut total = 0usize;
139 let limit = limit.unwrap_or(usize::MAX);
140
141 let start_block = start_block.unwrap_or(0);
142 let end_block = end_block.unwrap_or(u64::MAX);
143
144 info!(
145 "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
146 segment,
147 start_block,
148 end_block,
149 if limit == usize::MAX { None } else { Some(limit) }
150 );
151
152 'outer: for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
153 if block_range.end() < start_block || block_range.start() > end_block {
154 continue;
155 }
156
157 let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
158 let jar_provider = static_file_provider
159 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
160 .ok_or_else(|| {
161 eyre::eyre!(
162 "Failed to get segment provider for segment {} at range {}",
163 segment,
164 block_range
165 )
166 })?;
167
168 let mut cursor = jar_provider.cursor()?;
169
170 while let Ok(Some(row)) = cursor.next_row() {
171 for col_data in row.iter() {
172 hasher.write(col_data);
173 }
174
175 total += 1;
176
177 if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
178 info!("Hashed {total} entries.");
179 }
180
181 if total >= limit {
182 break 'outer;
183 }
184 }
185
186 drop(jar_provider);
188 static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
189 }
190
191 let checksum = hasher.finish();
192 let elapsed = start_time.elapsed();
193
194 info!(
195 "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
196 segment, checksum, total, elapsed
197 );
198
199 Ok(())
200}
201
202pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
203 tool: &'a DbTool<N>,
204 start_key: Option<String>,
205 end_key: Option<String>,
206 limit: Option<usize>,
207}
208
209impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
210 pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
211 ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
212 }
213}
214
215impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
216 type Error = eyre::Report;
217
218 fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
219 let provider =
220 self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
221 let tx = provider.tx_ref();
222 info!(
223 "Start computing checksum, start={:?}, end={:?}, limit={:?}",
224 self.start_key, self.end_key, self.limit
225 );
226
227 let mut cursor = tx.cursor_read::<RawTable<T>>()?;
228 let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
229 (Some(start), Some(end)) => {
230 let start_key = table_key::<T>(start).map(RawKey::new)?;
231 let end_key = table_key::<T>(end).map(RawKey::new)?;
232 cursor.walk_range(start_key..=end_key)?
233 }
234 (None, Some(end)) => {
235 let end_key = table_key::<T>(end).map(RawKey::new)?;
236
237 cursor.walk_range(..=end_key)?
238 }
239 (Some(start), None) => {
240 let start_key = table_key::<T>(start).map(RawKey::new)?;
241 cursor.walk_range(start_key..)?
242 }
243 (None, None) => cursor.walk_range(..)?,
244 };
245
246 let start_time = Instant::now();
247 let mut hasher = checksum_hasher();
248 let mut total = 0;
249
250 let limit = self.limit.unwrap_or(usize::MAX);
251 let mut enumerate_start_key = None;
252 let mut enumerate_end_key = None;
253 for (index, entry) in walker.enumerate() {
254 let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
255
256 if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
257 info!("Hashed {index} entries.");
258 }
259
260 hasher.write(k.raw_key());
261 hasher.write(v.raw_value());
262
263 if enumerate_start_key.is_none() {
264 enumerate_start_key = Some(k.clone());
265 }
266 enumerate_end_key = Some(k);
267
268 total = index + 1;
269 if total >= limit {
270 break
271 }
272 }
273
274 info!("Hashed {total} entries.");
275 if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
276 info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
277 info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
278 }
279
280 let checksum = hasher.finish();
281 let elapsed = start_time.elapsed();
282
283 info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
284
285 Ok((checksum, elapsed))
286 }
287}