1use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
2use clap::Parser;
3use parking_lot::Mutex;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRO},
6 database::Database,
7 tables,
8 transaction::DbTx,
9};
10use reth_db_common::DbTool;
11use reth_node_builder::NodeTypesWithDB;
12use reth_provider::providers::ProviderNodeTypes;
13use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
14use reth_tasks::spawn_scoped_os_thread;
15use std::{
16 collections::BTreeSet,
17 thread,
18 time::{Duration, Instant},
19};
20use tracing::{error, info};
21
22const LOG_INTERVAL: Duration = Duration::from_secs(30);
24
25#[derive(Parser, Debug)]
27pub struct Command {
28 address: Address,
30
31 #[arg(long, short)]
33 block: Option<BlockNumber>,
34
35 #[arg(long, short, default_value = "100")]
37 limit: usize,
38
39 #[arg(long, short, default_value = "table")]
41 format: OutputFormat,
42}
43
44impl Command {
45 pub fn execute<N: NodeTypesWithDB + ProviderNodeTypes>(
47 self,
48 tool: &DbTool<N>,
49 ) -> eyre::Result<()> {
50 let address = self.address;
51 let limit = self.limit;
52
53 if let Some(block) = self.block {
54 self.execute_historical(tool, address, block, limit)
55 } else {
56 self.execute_current(tool, address, limit)
57 }
58 }
59
60 fn execute_current<N: NodeTypesWithDB + ProviderNodeTypes>(
61 &self,
62 tool: &DbTool<N>,
63 address: Address,
64 limit: usize,
65 ) -> eyre::Result<()> {
66 let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
67
68 let entries = tool.provider_factory.db_ref().view(|tx| {
69 let (account, walker_entries) = if use_hashed_state {
70 let hashed_address = keccak256(address);
71 let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
72 let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
73 let walker = cursor.walk_dup(Some(hashed_address), None)?;
74 let mut entries = Vec::new();
75 let mut last_log = Instant::now();
76 for (idx, entry) in walker.enumerate() {
77 let (_, storage_entry) = entry?;
78 if storage_entry.value != U256::ZERO {
79 entries.push((storage_entry.key, storage_entry.value));
80 }
81 if entries.len() >= limit {
82 break;
83 }
84 if last_log.elapsed() >= LOG_INTERVAL {
85 info!(
86 target: "reth::cli",
87 address = %address,
88 slots_scanned = idx,
89 "Scanning storage slots"
90 );
91 last_log = Instant::now();
92 }
93 }
94 (account, entries)
95 } else {
96 let account = tx.get::<tables::PlainAccountState>(address)?;
98 let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
100 let walker = cursor.walk_dup(Some(address), None)?;
101 let mut entries = Vec::new();
102 let mut last_log = Instant::now();
103 for (idx, entry) in walker.enumerate() {
104 let (_, storage_entry) = entry?;
105 if storage_entry.value != U256::ZERO {
106 entries.push((storage_entry.key, storage_entry.value));
107 }
108 if entries.len() >= limit {
109 break;
110 }
111 if last_log.elapsed() >= LOG_INTERVAL {
112 info!(
113 target: "reth::cli",
114 address = %address,
115 slots_scanned = idx,
116 "Scanning storage slots"
117 );
118 last_log = Instant::now();
119 }
120 }
121 (account, entries)
122 };
123
124 Ok::<_, eyre::Report>((account, walker_entries))
125 })??;
126
127 let (account, storage_entries) = entries;
128
129 self.print_results(address, None, account, &storage_entries);
130
131 Ok(())
132 }
133
134 fn execute_historical<N: NodeTypesWithDB + ProviderNodeTypes>(
135 &self,
136 tool: &DbTool<N>,
137 address: Address,
138 block: BlockNumber,
139 limit: usize,
140 ) -> eyre::Result<()> {
141 let provider = tool.provider_factory.history_by_block_number(block)?;
142
143 let account = provider.basic_account(&address)?;
145
146 let storage_settings = tool.provider_factory.cached_storage_settings();
148 let history_in_rocksdb = storage_settings.storage_v2;
149
150 let mut storage_keys = BTreeSet::new();
153
154 if history_in_rocksdb {
155 error!(
156 target: "reth::cli",
157 "Historical storage queries with RocksDB backend are not yet supported. \
158 Use MDBX for storage history or query current state without --block."
159 );
160 return Ok(());
161 }
162
163 self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
165
166 info!(
167 target: "reth::cli",
168 address = %address,
169 block = block,
170 total_keys = storage_keys.len(),
171 "Found storage keys to query"
172 );
173
174 let mut entries = Vec::new();
177 let mut last_log = Instant::now();
178
179 for (idx, key) in storage_keys.iter().enumerate() {
180 match provider.storage(address, *key) {
181 Ok(Some(value)) if value != U256::ZERO => {
182 entries.push((*key, value));
183 }
184 _ => {}
185 }
186
187 if entries.len() >= limit {
188 break;
189 }
190
191 if last_log.elapsed() >= LOG_INTERVAL {
192 info!(
193 target: "reth::cli",
194 address = %address,
195 block = block,
196 keys_total = storage_keys.len(),
197 slots_scanned = idx,
198 slots_found = entries.len(),
199 "Scanning historical storage slots"
200 );
201 last_log = Instant::now();
202 }
203 }
204
205 self.print_results(address, Some(block), account, &entries);
206
207 Ok(())
208 }
209
210 fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
212 &self,
213 tool: &DbTool<N>,
214 address: Address,
215 keys: &mut BTreeSet<B256>,
216 ) -> eyre::Result<()> {
217 const CHUNK_SIZE: u64 = 500_000; let num_threads = std::thread::available_parallelism()
219 .map(|p| p.get().saturating_sub(1).max(1))
220 .unwrap_or(4);
221
222 let tip = tool.provider_factory.provider()?.best_block_number()?;
224
225 if tip == 0 {
226 return Ok(());
227 }
228
229 info!(
230 target: "reth::cli",
231 address = %address,
232 tip,
233 chunk_size = CHUNK_SIZE,
234 num_threads,
235 "Starting parallel MDBX changeset scan"
236 );
237
238 let collected_keys: Mutex<BTreeSet<B256>> = Mutex::new(BTreeSet::new());
240 let total_entries_scanned = Mutex::new(0usize);
241
242 let mut chunks: Vec<(u64, u64)> = Vec::new();
244 let mut start = 0u64;
245 while start <= tip {
246 let end = (start + CHUNK_SIZE - 1).min(tip);
247 chunks.push((start, end));
248 start = end + 1;
249 }
250
251 let chunks_ref = &chunks;
252 let next_chunk = Mutex::new(0usize);
253 let next_chunk_ref = &next_chunk;
254 let collected_keys_ref = &collected_keys;
255 let total_entries_ref = &total_entries_scanned;
256
257 thread::scope(|s| {
258 let handles: Vec<_> = (0..num_threads)
259 .map(|thread_id| {
260 spawn_scoped_os_thread(s, "db-state-worker", move || {
261 loop {
262 let chunk_idx = {
264 let mut idx = next_chunk_ref.lock();
265 if *idx >= chunks_ref.len() {
266 return Ok::<_, eyre::Report>(());
267 }
268 let current = *idx;
269 *idx += 1;
270 current
271 };
272
273 let (chunk_start, chunk_end) = chunks_ref[chunk_idx];
274
275 tool.provider_factory.db_ref().view(|tx| {
277 tx.disable_long_read_transaction_safety();
278
279 let mut changeset_cursor =
280 tx.cursor_read::<tables::StorageChangeSets>()?;
281 let start_key =
282 reth_db_api::models::BlockNumberAddress((chunk_start, address));
283 let end_key =
284 reth_db_api::models::BlockNumberAddress((chunk_end, address));
285
286 let mut local_keys = BTreeSet::new();
287 let mut entries_in_chunk = 0usize;
288
289 if let Ok(walker) = changeset_cursor.walk_range(start_key..=end_key)
290 {
291 for (block_addr, storage_entry) in walker.flatten() {
292 if block_addr.address() == address {
293 local_keys.insert(storage_entry.key);
294 }
295 entries_in_chunk += 1;
296 }
297 }
298
299 collected_keys_ref.lock().extend(local_keys);
301 *total_entries_ref.lock() += entries_in_chunk;
302
303 info!(
304 target: "reth::cli",
305 thread_id,
306 chunk_start,
307 chunk_end,
308 entries_in_chunk,
309 "Thread completed chunk"
310 );
311
312 Ok::<_, eyre::Report>(())
313 })??;
314 }
315 })
316 })
317 .collect();
318
319 for handle in handles {
320 handle.join().map_err(|_| eyre::eyre!("Thread panicked"))??;
321 }
322
323 Ok::<_, eyre::Report>(())
324 })?;
325
326 let final_keys = collected_keys.into_inner();
327 let total = *total_entries_scanned.lock();
328
329 info!(
330 target: "reth::cli",
331 address = %address,
332 total_entries = total,
333 unique_keys = final_keys.len(),
334 "Finished parallel MDBX changeset scan"
335 );
336
337 keys.extend(final_keys);
338 Ok(())
339 }
340
341 fn print_results(
342 &self,
343 address: Address,
344 block: Option<BlockNumber>,
345 account: Option<reth_primitives_traits::Account>,
346 storage: &[(alloy_primitives::B256, U256)],
347 ) {
348 match self.format {
349 OutputFormat::Table => {
350 println!("Account: {address}");
351 if let Some(b) = block {
352 println!("Block: {b}");
353 } else {
354 println!("Block: latest");
355 }
356 println!();
357
358 if let Some(acc) = account {
359 println!("Nonce: {}", acc.nonce);
360 println!("Balance: {} wei", acc.balance);
361 if let Some(code_hash) = acc.bytecode_hash {
362 println!("Code hash: {code_hash}");
363 }
364 } else {
365 println!("Account not found");
366 }
367
368 println!();
369 println!("Storage ({} slots):", storage.len());
370 println!("{:-<130}", "");
371 println!("{:<66} | {:<64}", "Slot", "Value");
372 println!("{:-<130}", "");
373 for (key, value) in storage {
374 println!("{key} | {value:#066x}");
375 }
376 }
377 OutputFormat::Json => {
378 let output = serde_json::json!({
379 "address": address.to_string(),
380 "block": block,
381 "account": account.map(|a| serde_json::json!({
382 "nonce": a.nonce,
383 "balance": a.balance.to_string(),
384 "code_hash": a.bytecode_hash.map(|h| h.to_string()),
385 })),
386 "storage": storage.iter().map(|(k, v)| {
387 serde_json::json!({
388 "key": k.to_string(),
389 "value": format!("{v:#066x}"),
390 })
391 }).collect::<Vec<_>>(),
392 });
393 println!("{}", serde_json::to_string_pretty(&output).unwrap());
394 }
395 OutputFormat::Csv => {
396 println!("slot,value");
397 for (key, value) in storage {
398 println!("{key},{value:#066x}");
399 }
400 }
401 }
402 }
403}
404
405#[derive(Debug, Clone, Default, clap::ValueEnum)]
406pub enum OutputFormat {
407 #[default]
408 Table,
409 Json,
410 Csv,
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 #[test]
418 fn parse_state_args() {
419 let cmd = Command::try_parse_from([
420 "state",
421 "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
422 "--block",
423 "1000000",
424 ])
425 .unwrap();
426 assert_eq!(
427 cmd.address,
428 "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
429 );
430 assert_eq!(cmd.block, Some(1000000));
431 }
432
433 #[test]
434 fn parse_state_args_no_block() {
435 let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
436 .unwrap();
437 assert_eq!(cmd.block, None);
438 }
439}