reth_cli_commands/db/checksum/
mod.rs1use crate::{
2 common::CliNodeTypes,
3 db::get::{maybe_json_value_parser, table_key},
4};
5use alloy_primitives::map::foldhash::fast::FixedState;
6use clap::Parser;
7use itertools::Itertools;
8use reth_chainspec::EthereumHardforks;
9use reth_db::{static_file::iter_static_files, DatabaseEnv};
10use reth_db_api::{
11 cursor::DbCursorRO, table::Table, transaction::DbTx, RawKey, RawTable, RawValue, TableViewer,
12 Tables,
13};
14use reth_db_common::DbTool;
15use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
16use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
17use reth_static_file_types::StaticFileSegment;
18use std::{
19 hash::{BuildHasher, Hasher},
20 sync::Arc,
21 time::{Duration, Instant},
22};
23use tracing::{info, warn};
24
25#[cfg(all(unix, feature = "edge"))]
26mod rocksdb;
27
28const PROGRESS_LOG_INTERVAL: usize = 100_000;
30
31#[derive(Parser, Debug)]
32pub struct Command {
34 #[command(subcommand)]
35 subcommand: Subcommand,
36}
37
38#[derive(clap::Subcommand, Debug)]
39enum Subcommand {
40 Mdbx {
42 table: Tables,
44
45 #[arg(long, value_parser = maybe_json_value_parser)]
47 start_key: Option<String>,
48
49 #[arg(long, value_parser = maybe_json_value_parser)]
51 end_key: Option<String>,
52
53 #[arg(long)]
56 limit: Option<usize>,
57 },
58 StaticFile {
60 #[arg(value_enum)]
62 segment: StaticFileSegment,
63
64 #[arg(long)]
66 start_block: Option<u64>,
67
68 #[arg(long)]
70 end_block: Option<u64>,
71
72 #[arg(long)]
74 limit: Option<usize>,
75 },
76 #[cfg(all(unix, feature = "edge"))]
78 Rocksdb {
79 #[arg(value_enum)]
81 table: rocksdb::RocksDbTable,
82
83 #[arg(long)]
85 limit: Option<usize>,
86 },
87}
88
89impl Command {
90 pub fn execute<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
92 self,
93 tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
94 ) -> eyre::Result<()> {
95 warn!("This command should be run without the node running!");
96
97 match self.subcommand {
98 Subcommand::Mdbx { table, start_key, end_key, limit } => {
99 table.view(&ChecksumViewer { tool, start_key, end_key, limit })?;
100 }
101 Subcommand::StaticFile { segment, start_block, end_block, limit } => {
102 checksum_static_file(tool, segment, start_block, end_block, limit)?;
103 }
104 #[cfg(all(unix, feature = "edge"))]
105 Subcommand::Rocksdb { table, limit } => {
106 rocksdb::checksum_rocksdb(tool, table, limit)?;
107 }
108 }
109
110 Ok(())
111 }
112}
113
114fn checksum_hasher() -> impl Hasher {
116 FixedState::with_seed(u64::from_be_bytes(*b"RETHRETH")).build_hasher()
117}
118
119fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
120 tool: &DbTool<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
121 segment: StaticFileSegment,
122 start_block: Option<u64>,
123 end_block: Option<u64>,
124 limit: Option<usize>,
125) -> eyre::Result<()> {
126 let static_file_provider = tool.provider_factory.static_file_provider();
127 if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?) {
128 warn!("Error checking consistency of static files: {err}");
129 }
130
131 let static_files = iter_static_files(static_file_provider.directory())?;
132
133 let ranges = static_files
134 .get(segment)
135 .ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
136
137 let start_time = Instant::now();
138 let mut hasher = checksum_hasher();
139 let mut total = 0usize;
140 let limit = limit.unwrap_or(usize::MAX);
141
142 let start_block = start_block.unwrap_or(0);
143 let end_block = end_block.unwrap_or(u64::MAX);
144
145 info!(
146 "Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
147 segment,
148 start_block,
149 end_block,
150 if limit == usize::MAX { None } else { Some(limit) }
151 );
152
153 'outer: for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
154 if block_range.end() < start_block || block_range.start() > end_block {
155 continue;
156 }
157
158 let fixed_block_range = static_file_provider.find_fixed_range(segment, block_range.start());
159 let jar_provider = static_file_provider
160 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
161 .ok_or_else(|| {
162 eyre::eyre!(
163 "Failed to get segment provider for segment {} at range {}",
164 segment,
165 block_range
166 )
167 })?;
168
169 let mut cursor = jar_provider.cursor()?;
170
171 while let Ok(Some(row)) = cursor.next_row() {
172 for col_data in row.iter() {
173 hasher.write(col_data);
174 }
175
176 total += 1;
177
178 if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
179 info!("Hashed {total} entries.");
180 }
181
182 if total >= limit {
183 break 'outer;
184 }
185 }
186
187 drop(jar_provider);
189 static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
190 }
191
192 let checksum = hasher.finish();
193 let elapsed = start_time.elapsed();
194
195 info!(
196 "Checksum for static file segment `{}`: {:#x} ({} entries, elapsed: {:?})",
197 segment, checksum, total, elapsed
198 );
199
200 Ok(())
201}
202
203pub(crate) struct ChecksumViewer<'a, N: NodeTypesWithDB> {
204 tool: &'a DbTool<N>,
205 start_key: Option<String>,
206 end_key: Option<String>,
207 limit: Option<usize>,
208}
209
210impl<N: NodeTypesWithDB> ChecksumViewer<'_, N> {
211 pub(crate) const fn new(tool: &'_ DbTool<N>) -> ChecksumViewer<'_, N> {
212 ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
213 }
214}
215
216impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N> {
217 type Error = eyre::Report;
218
219 fn view<T: Table>(&self) -> Result<(u64, Duration), Self::Error> {
220 let provider =
221 self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
222 let tx = provider.tx_ref();
223 info!(
224 "Start computing checksum, start={:?}, end={:?}, limit={:?}",
225 self.start_key, self.end_key, self.limit
226 );
227
228 let mut cursor = tx.cursor_read::<RawTable<T>>()?;
229 let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
230 (Some(start), Some(end)) => {
231 let start_key = table_key::<T>(start).map(RawKey::new)?;
232 let end_key = table_key::<T>(end).map(RawKey::new)?;
233 cursor.walk_range(start_key..=end_key)?
234 }
235 (None, Some(end)) => {
236 let end_key = table_key::<T>(end).map(RawKey::new)?;
237
238 cursor.walk_range(..=end_key)?
239 }
240 (Some(start), None) => {
241 let start_key = table_key::<T>(start).map(RawKey::new)?;
242 cursor.walk_range(start_key..)?
243 }
244 (None, None) => cursor.walk_range(..)?,
245 };
246
247 let start_time = Instant::now();
248 let mut hasher = checksum_hasher();
249 let mut total = 0;
250
251 let limit = self.limit.unwrap_or(usize::MAX);
252 let mut enumerate_start_key = None;
253 let mut enumerate_end_key = None;
254 for (index, entry) in walker.enumerate() {
255 let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
256
257 if index.is_multiple_of(PROGRESS_LOG_INTERVAL) {
258 info!("Hashed {index} entries.");
259 }
260
261 hasher.write(k.raw_key());
262 hasher.write(v.raw_value());
263
264 if enumerate_start_key.is_none() {
265 enumerate_start_key = Some(k.clone());
266 }
267 enumerate_end_key = Some(k);
268
269 total = index + 1;
270 if total >= limit {
271 break
272 }
273 }
274
275 info!("Hashed {total} entries.");
276 if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
277 info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
278 info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
279 }
280
281 let checksum = hasher.finish();
282 let elapsed = start_time.elapsed();
283
284 info!("Checksum for table `{}`: {:#x} (elapsed: {:?})", T::NAME, checksum, elapsed);
285
286 Ok((checksum, elapsed))
287 }
288}