1use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
2use clap::Parser;
3use parking_lot::Mutex;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRO},
6 database::Database,
7 tables,
8 transaction::DbTx,
9};
10use reth_db_common::DbTool;
11use reth_node_builder::NodeTypesWithDB;
12use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
13use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
14use reth_tasks::spawn_scoped_os_thread;
15use std::{
16 collections::BTreeSet,
17 thread,
18 time::{Duration, Instant},
19};
20use tracing::info;
21
22const LOG_INTERVAL: Duration = Duration::from_secs(30);
24
25#[derive(Parser, Debug)]
27pub struct Command {
28 address: Address,
30
31 #[arg(long, short)]
33 block: Option<BlockNumber>,
34
35 #[arg(long, short, default_value = "100")]
37 limit: usize,
38
39 #[arg(long, short, default_value = "table")]
41 format: OutputFormat,
42}
43
44impl Command {
45 pub fn execute<N: NodeTypesWithDB + ProviderNodeTypes>(
47 self,
48 tool: &DbTool<N>,
49 ) -> eyre::Result<()> {
50 let address = self.address;
51 let limit = self.limit;
52
53 if let Some(block) = self.block {
54 self.execute_historical(tool, address, block, limit)
55 } else {
56 self.execute_current(tool, address, limit)
57 }
58 }
59
60 fn execute_current<N: NodeTypesWithDB + ProviderNodeTypes>(
61 &self,
62 tool: &DbTool<N>,
63 address: Address,
64 limit: usize,
65 ) -> eyre::Result<()> {
66 let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
67
68 let entries = tool.provider_factory.db_ref().view(|tx| {
69 let (account, walker_entries) = if use_hashed_state {
70 let hashed_address = keccak256(address);
71 let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
72 let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
73 let walker = cursor.walk_dup(Some(hashed_address), None)?;
74 let mut entries = Vec::new();
75 let mut last_log = Instant::now();
76 for (idx, entry) in walker.enumerate() {
77 let (_, storage_entry) = entry?;
78 if storage_entry.value != U256::ZERO {
79 entries.push((storage_entry.key, storage_entry.value));
80 }
81 if entries.len() >= limit {
82 break;
83 }
84 if last_log.elapsed() >= LOG_INTERVAL {
85 info!(
86 target: "reth::cli",
87 address = %address,
88 slots_scanned = idx,
89 "Scanning storage slots"
90 );
91 last_log = Instant::now();
92 }
93 }
94 (account, entries)
95 } else {
96 let account = tx.get::<tables::PlainAccountState>(address)?;
98 let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
100 let walker = cursor.walk_dup(Some(address), None)?;
101 let mut entries = Vec::new();
102 let mut last_log = Instant::now();
103 for (idx, entry) in walker.enumerate() {
104 let (_, storage_entry) = entry?;
105 if storage_entry.value != U256::ZERO {
106 entries.push((storage_entry.key, storage_entry.value));
107 }
108 if entries.len() >= limit {
109 break;
110 }
111 if last_log.elapsed() >= LOG_INTERVAL {
112 info!(
113 target: "reth::cli",
114 address = %address,
115 slots_scanned = idx,
116 "Scanning storage slots"
117 );
118 last_log = Instant::now();
119 }
120 }
121 (account, entries)
122 };
123
124 Ok::<_, eyre::Report>((account, walker_entries))
125 })??;
126
127 let (account, storage_entries) = entries;
128
129 self.print_results(address, None, account, &storage_entries);
130
131 Ok(())
132 }
133
134 fn execute_historical<N: NodeTypesWithDB + ProviderNodeTypes>(
135 &self,
136 tool: &DbTool<N>,
137 address: Address,
138 block: BlockNumber,
139 limit: usize,
140 ) -> eyre::Result<()> {
141 let provider = tool.provider_factory.history_by_block_number(block)?;
142
143 let account = provider.basic_account(&address)?;
145
146 let storage_settings = tool.provider_factory.cached_storage_settings();
148 let history_in_rocksdb = storage_settings.storage_v2;
149
150 let mut storage_keys = BTreeSet::new();
153
154 if history_in_rocksdb {
155 self.collect_staticfile_storage_keys(tool, address, &mut storage_keys)?;
156 } else {
157 self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
158 }
159
160 info!(
161 target: "reth::cli",
162 address = %address,
163 block = block,
164 total_keys = storage_keys.len(),
165 "Found storage keys to query"
166 );
167
168 let mut entries = Vec::new();
171 let mut last_log = Instant::now();
172
173 for (idx, key) in storage_keys.iter().enumerate() {
174 match provider.storage(address, *key) {
175 Ok(Some(value)) if value != U256::ZERO => {
176 entries.push((*key, value));
177 }
178 _ => {}
179 }
180
181 if entries.len() >= limit {
182 break;
183 }
184
185 if last_log.elapsed() >= LOG_INTERVAL {
186 info!(
187 target: "reth::cli",
188 address = %address,
189 block = block,
190 keys_total = storage_keys.len(),
191 slots_scanned = idx,
192 slots_found = entries.len(),
193 "Scanning historical storage slots"
194 );
195 last_log = Instant::now();
196 }
197 }
198
199 self.print_results(address, Some(block), account, &entries);
200
201 Ok(())
202 }
203
204 fn collect_staticfile_storage_keys<N: NodeTypesWithDB + ProviderNodeTypes>(
206 &self,
207 tool: &DbTool<N>,
208 address: Address,
209 keys: &mut BTreeSet<B256>,
210 ) -> eyre::Result<()> {
211 let tip = tool.provider_factory.provider()?.best_block_number()?;
212
213 if tip == 0 {
214 return Ok(());
215 }
216
217 info!(
218 target: "reth::cli",
219 address = %address,
220 tip,
221 "Scanning static file storage changesets"
222 );
223
224 let static_file_provider = tool.provider_factory.static_file_provider();
225 let walker = static_file_provider.walk_storage_changeset_range(0..=tip);
226
227 let mut total_scanned = 0usize;
228 let mut last_log = Instant::now();
229
230 for changeset_result in walker {
231 let (block_addr, storage_entry) = changeset_result?;
232 total_scanned += 1;
233
234 if block_addr.address() == address {
235 keys.insert(storage_entry.key);
236 }
237
238 if last_log.elapsed() >= LOG_INTERVAL {
239 info!(
240 target: "reth::cli",
241 address = %address,
242 entries_scanned = total_scanned,
243 unique_keys = keys.len(),
244 "Scanning static file storage changesets"
245 );
246 last_log = Instant::now();
247 }
248 }
249
250 info!(
251 target: "reth::cli",
252 address = %address,
253 total_entries = total_scanned,
254 unique_keys = keys.len(),
255 "Finished static file storage changeset scan"
256 );
257
258 Ok(())
259 }
260
261 fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
263 &self,
264 tool: &DbTool<N>,
265 address: Address,
266 keys: &mut BTreeSet<B256>,
267 ) -> eyre::Result<()> {
268 const CHUNK_SIZE: u64 = 500_000; let num_threads = std::thread::available_parallelism()
270 .map(|p| p.get().saturating_sub(1).max(1))
271 .unwrap_or(4);
272
273 let tip = tool.provider_factory.provider()?.best_block_number()?;
275
276 if tip == 0 {
277 return Ok(());
278 }
279
280 info!(
281 target: "reth::cli",
282 address = %address,
283 tip,
284 chunk_size = CHUNK_SIZE,
285 num_threads,
286 "Starting parallel MDBX changeset scan"
287 );
288
289 let collected_keys: Mutex<BTreeSet<B256>> = Mutex::new(BTreeSet::new());
291 let total_entries_scanned = Mutex::new(0usize);
292
293 let mut chunks: Vec<(u64, u64)> = Vec::new();
295 let mut start = 0u64;
296 while start <= tip {
297 let end = (start + CHUNK_SIZE - 1).min(tip);
298 chunks.push((start, end));
299 start = end + 1;
300 }
301
302 let chunks_ref = &chunks;
303 let next_chunk = Mutex::new(0usize);
304 let next_chunk_ref = &next_chunk;
305 let collected_keys_ref = &collected_keys;
306 let total_entries_ref = &total_entries_scanned;
307
308 thread::scope(|s| {
309 let handles: Vec<_> = (0..num_threads)
310 .map(|thread_id| {
311 spawn_scoped_os_thread(s, "db-state-worker", move || {
312 loop {
313 let chunk_idx = {
315 let mut idx = next_chunk_ref.lock();
316 if *idx >= chunks_ref.len() {
317 return Ok::<_, eyre::Report>(());
318 }
319 let current = *idx;
320 *idx += 1;
321 current
322 };
323
324 let (chunk_start, chunk_end) = chunks_ref[chunk_idx];
325
326 tool.provider_factory.db_ref().view(|tx| {
328 tx.disable_long_read_transaction_safety();
329
330 let mut changeset_cursor =
331 tx.cursor_read::<tables::StorageChangeSets>()?;
332 let start_key =
333 reth_db_api::models::BlockNumberAddress((chunk_start, address));
334 let end_key =
335 reth_db_api::models::BlockNumberAddress((chunk_end, address));
336
337 let mut local_keys = BTreeSet::new();
338 let mut entries_in_chunk = 0usize;
339
340 if let Ok(walker) = changeset_cursor.walk_range(start_key..=end_key)
341 {
342 for (block_addr, storage_entry) in walker.flatten() {
343 if block_addr.address() == address {
344 local_keys.insert(storage_entry.key);
345 }
346 entries_in_chunk += 1;
347 }
348 }
349
350 collected_keys_ref.lock().extend(local_keys);
352 *total_entries_ref.lock() += entries_in_chunk;
353
354 info!(
355 target: "reth::cli",
356 thread_id,
357 chunk_start,
358 chunk_end,
359 entries_in_chunk,
360 "Thread completed chunk"
361 );
362
363 Ok::<_, eyre::Report>(())
364 })??;
365 }
366 })
367 })
368 .collect();
369
370 for handle in handles {
371 handle.join().map_err(|_| eyre::eyre!("Thread panicked"))??;
372 }
373
374 Ok::<_, eyre::Report>(())
375 })?;
376
377 let final_keys = collected_keys.into_inner();
378 let total = *total_entries_scanned.lock();
379
380 info!(
381 target: "reth::cli",
382 address = %address,
383 total_entries = total,
384 unique_keys = final_keys.len(),
385 "Finished parallel MDBX changeset scan"
386 );
387
388 keys.extend(final_keys);
389 Ok(())
390 }
391
392 fn print_results(
393 &self,
394 address: Address,
395 block: Option<BlockNumber>,
396 account: Option<reth_primitives_traits::Account>,
397 storage: &[(alloy_primitives::B256, U256)],
398 ) {
399 match self.format {
400 OutputFormat::Table => {
401 println!("Account: {address}");
402 if let Some(b) = block {
403 println!("Block: {b}");
404 } else {
405 println!("Block: latest");
406 }
407 println!();
408
409 if let Some(acc) = account {
410 println!("Nonce: {}", acc.nonce);
411 println!("Balance: {} wei", acc.balance);
412 if let Some(code_hash) = acc.bytecode_hash {
413 println!("Code hash: {code_hash}");
414 }
415 } else {
416 println!("Account not found");
417 }
418
419 println!();
420 println!("Storage ({} slots):", storage.len());
421 println!("{:-<130}", "");
422 println!("{:<66} | {:<64}", "Slot", "Value");
423 println!("{:-<130}", "");
424 for (key, value) in storage {
425 println!("{key} | {value:#066x}");
426 }
427 }
428 OutputFormat::Json => {
429 let output = serde_json::json!({
430 "address": address.to_string(),
431 "block": block,
432 "account": account.map(|a| serde_json::json!({
433 "nonce": a.nonce,
434 "balance": a.balance.to_string(),
435 "code_hash": a.bytecode_hash.map(|h| h.to_string()),
436 })),
437 "storage": storage.iter().map(|(k, v)| {
438 serde_json::json!({
439 "key": k.to_string(),
440 "value": format!("{v:#066x}"),
441 })
442 }).collect::<Vec<_>>(),
443 });
444 println!("{}", serde_json::to_string_pretty(&output).unwrap());
445 }
446 OutputFormat::Csv => {
447 println!("slot,value");
448 for (key, value) in storage {
449 println!("{key},{value:#066x}");
450 }
451 }
452 }
453 }
454}
455
456#[derive(Debug, Clone, Default, clap::ValueEnum)]
457pub enum OutputFormat {
458 #[default]
459 Table,
460 Json,
461 Csv,
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[test]
469 fn parse_state_args() {
470 let cmd = Command::try_parse_from([
471 "state",
472 "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
473 "--block",
474 "1000000",
475 ])
476 .unwrap();
477 assert_eq!(
478 cmd.address,
479 "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
480 );
481 assert_eq!(cmd.block, Some(1000000));
482 }
483
484 #[test]
485 fn parse_state_args_no_block() {
486 let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
487 .unwrap();
488 assert_eq!(cmd.block, None);
489 }
490}