Skip to main content

reth_cli_commands/db/
state.rs

1use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
2use clap::Parser;
3use parking_lot::Mutex;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbDupCursorRO},
6    database::Database,
7    tables,
8    transaction::DbTx,
9};
10use reth_db_common::DbTool;
11use reth_node_builder::NodeTypesWithDB;
12use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
13use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
14use reth_tasks::spawn_scoped_os_thread;
15use std::{
16    collections::BTreeSet,
17    thread,
18    time::{Duration, Instant},
19};
20use tracing::info;
21
22/// Log progress every 30 seconds
23const LOG_INTERVAL: Duration = Duration::from_secs(30);
24
25/// The arguments for the `reth db state` command
26#[derive(Parser, Debug)]
27pub struct Command {
28    /// The account address to get state for
29    address: Address,
30
31    /// Block number to query state at (uses current state if not provided)
32    #[arg(long, short)]
33    block: Option<BlockNumber>,
34
35    /// Maximum number of storage slots to display
36    #[arg(long, short, default_value = "100")]
37    limit: usize,
38
39    /// Output format (table, json, csv)
40    #[arg(long, short, default_value = "table")]
41    format: OutputFormat,
42}
43
44impl Command {
45    /// Execute `db state` command
46    pub fn execute<N: NodeTypesWithDB + ProviderNodeTypes>(
47        self,
48        tool: &DbTool<N>,
49    ) -> eyre::Result<()> {
50        let address = self.address;
51        let limit = self.limit;
52
53        if let Some(block) = self.block {
54            self.execute_historical(tool, address, block, limit)
55        } else {
56            self.execute_current(tool, address, limit)
57        }
58    }
59
60    fn execute_current<N: NodeTypesWithDB + ProviderNodeTypes>(
61        &self,
62        tool: &DbTool<N>,
63        address: Address,
64        limit: usize,
65    ) -> eyre::Result<()> {
66        let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
67
68        let entries = tool.provider_factory.db_ref().view(|tx| {
69            let (account, walker_entries) = if use_hashed_state {
70                let hashed_address = keccak256(address);
71                let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
72                let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
73                let walker = cursor.walk_dup(Some(hashed_address), None)?;
74                let mut entries = Vec::new();
75                let mut last_log = Instant::now();
76                for (idx, entry) in walker.enumerate() {
77                    let (_, storage_entry) = entry?;
78                    if storage_entry.value != U256::ZERO {
79                        entries.push((storage_entry.key, storage_entry.value));
80                    }
81                    if entries.len() >= limit {
82                        break;
83                    }
84                    if last_log.elapsed() >= LOG_INTERVAL {
85                        info!(
86                            target: "reth::cli",
87                            address = %address,
88                            slots_scanned = idx,
89                            "Scanning storage slots"
90                        );
91                        last_log = Instant::now();
92                    }
93                }
94                (account, entries)
95            } else {
96                // Get account info
97                let account = tx.get::<tables::PlainAccountState>(address)?;
98                // Get storage entries
99                let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
100                let walker = cursor.walk_dup(Some(address), None)?;
101                let mut entries = Vec::new();
102                let mut last_log = Instant::now();
103                for (idx, entry) in walker.enumerate() {
104                    let (_, storage_entry) = entry?;
105                    if storage_entry.value != U256::ZERO {
106                        entries.push((storage_entry.key, storage_entry.value));
107                    }
108                    if entries.len() >= limit {
109                        break;
110                    }
111                    if last_log.elapsed() >= LOG_INTERVAL {
112                        info!(
113                            target: "reth::cli",
114                            address = %address,
115                            slots_scanned = idx,
116                            "Scanning storage slots"
117                        );
118                        last_log = Instant::now();
119                    }
120                }
121                (account, entries)
122            };
123
124            Ok::<_, eyre::Report>((account, walker_entries))
125        })??;
126
127        let (account, storage_entries) = entries;
128
129        self.print_results(address, None, account, &storage_entries);
130
131        Ok(())
132    }
133
134    fn execute_historical<N: NodeTypesWithDB + ProviderNodeTypes>(
135        &self,
136        tool: &DbTool<N>,
137        address: Address,
138        block: BlockNumber,
139        limit: usize,
140    ) -> eyre::Result<()> {
141        let provider = tool.provider_factory.history_by_block_number(block)?;
142
143        // Get account info at that block
144        let account = provider.basic_account(&address)?;
145
146        // Check storage settings to determine where history is stored
147        let storage_settings = tool.provider_factory.cached_storage_settings();
148        let history_in_rocksdb = storage_settings.storage_v2;
149
150        // For historical queries, enumerate keys from history indices only
151        // (not PlainStorageState, which reflects current state)
152        let mut storage_keys = BTreeSet::new();
153
154        if history_in_rocksdb {
155            self.collect_staticfile_storage_keys(tool, address, &mut storage_keys)?;
156        } else {
157            self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
158        }
159
160        info!(
161            target: "reth::cli",
162            address = %address,
163            block = block,
164            total_keys = storage_keys.len(),
165            "Found storage keys to query"
166        );
167
168        // Now query each key at the historical block using the StateProvider
169        // This handles both MDBX and RocksDB backends transparently
170        let mut entries = Vec::new();
171        let mut last_log = Instant::now();
172
173        for (idx, key) in storage_keys.iter().enumerate() {
174            match provider.storage(address, *key) {
175                Ok(Some(value)) if value != U256::ZERO => {
176                    entries.push((*key, value));
177                }
178                _ => {}
179            }
180
181            if entries.len() >= limit {
182                break;
183            }
184
185            if last_log.elapsed() >= LOG_INTERVAL {
186                info!(
187                    target: "reth::cli",
188                    address = %address,
189                    block = block,
190                    keys_total = storage_keys.len(),
191                    slots_scanned = idx,
192                    slots_found = entries.len(),
193                    "Scanning historical storage slots"
194                );
195                last_log = Instant::now();
196            }
197        }
198
199        self.print_results(address, Some(block), account, &entries);
200
201        Ok(())
202    }
203
204    /// Collects storage keys from static file StorageChangeSets (storage_v2).
205    fn collect_staticfile_storage_keys<N: NodeTypesWithDB + ProviderNodeTypes>(
206        &self,
207        tool: &DbTool<N>,
208        address: Address,
209        keys: &mut BTreeSet<B256>,
210    ) -> eyre::Result<()> {
211        let tip = tool.provider_factory.provider()?.best_block_number()?;
212
213        if tip == 0 {
214            return Ok(());
215        }
216
217        info!(
218            target: "reth::cli",
219            address = %address,
220            tip,
221            "Scanning static file storage changesets"
222        );
223
224        let static_file_provider = tool.provider_factory.static_file_provider();
225        let walker = static_file_provider.walk_storage_changeset_range(0..=tip);
226
227        let mut total_scanned = 0usize;
228        let mut last_log = Instant::now();
229
230        for changeset_result in walker {
231            let (block_addr, storage_entry) = changeset_result?;
232            total_scanned += 1;
233
234            if block_addr.address() == address {
235                keys.insert(storage_entry.key);
236            }
237
238            if last_log.elapsed() >= LOG_INTERVAL {
239                info!(
240                    target: "reth::cli",
241                    address = %address,
242                    entries_scanned = total_scanned,
243                    unique_keys = keys.len(),
244                    "Scanning static file storage changesets"
245                );
246                last_log = Instant::now();
247            }
248        }
249
250        info!(
251            target: "reth::cli",
252            address = %address,
253            total_entries = total_scanned,
254            unique_keys = keys.len(),
255            "Finished static file storage changeset scan"
256        );
257
258        Ok(())
259    }
260
261    /// Collects storage keys from MDBX StorageChangeSets using parallel block range scanning.
262    fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
263        &self,
264        tool: &DbTool<N>,
265        address: Address,
266        keys: &mut BTreeSet<B256>,
267    ) -> eyre::Result<()> {
268        const CHUNK_SIZE: u64 = 500_000; // 500k blocks per thread
269        let num_threads = std::thread::available_parallelism()
270            .map(|p| p.get().saturating_sub(1).max(1))
271            .unwrap_or(4);
272
273        // Get the current tip block
274        let tip = tool.provider_factory.provider()?.best_block_number()?;
275
276        if tip == 0 {
277            return Ok(());
278        }
279
280        info!(
281            target: "reth::cli",
282            address = %address,
283            tip,
284            chunk_size = CHUNK_SIZE,
285            num_threads,
286            "Starting parallel MDBX changeset scan"
287        );
288
289        // Shared state for collecting keys
290        let collected_keys: Mutex<BTreeSet<B256>> = Mutex::new(BTreeSet::new());
291        let total_entries_scanned = Mutex::new(0usize);
292
293        // Create chunk ranges
294        let mut chunks: Vec<(u64, u64)> = Vec::new();
295        let mut start = 0u64;
296        while start <= tip {
297            let end = (start + CHUNK_SIZE - 1).min(tip);
298            chunks.push((start, end));
299            start = end + 1;
300        }
301
302        let chunks_ref = &chunks;
303        let next_chunk = Mutex::new(0usize);
304        let next_chunk_ref = &next_chunk;
305        let collected_keys_ref = &collected_keys;
306        let total_entries_ref = &total_entries_scanned;
307
308        thread::scope(|s| {
309            let handles: Vec<_> = (0..num_threads)
310                .map(|thread_id| {
311                    spawn_scoped_os_thread(s, "db-state-worker", move || {
312                        loop {
313                            // Get next chunk to process
314                            let chunk_idx = {
315                                let mut idx = next_chunk_ref.lock();
316                                if *idx >= chunks_ref.len() {
317                                    return Ok::<_, eyre::Report>(());
318                                }
319                                let current = *idx;
320                                *idx += 1;
321                                current
322                            };
323
324                            let (chunk_start, chunk_end) = chunks_ref[chunk_idx];
325
326                            // Open a new read transaction for this chunk
327                            tool.provider_factory.db_ref().view(|tx| {
328                                tx.disable_long_read_transaction_safety();
329
330                                let mut changeset_cursor =
331                                    tx.cursor_read::<tables::StorageChangeSets>()?;
332                                let start_key =
333                                    reth_db_api::models::BlockNumberAddress((chunk_start, address));
334                                let end_key =
335                                    reth_db_api::models::BlockNumberAddress((chunk_end, address));
336
337                                let mut local_keys = BTreeSet::new();
338                                let mut entries_in_chunk = 0usize;
339
340                                if let Ok(walker) = changeset_cursor.walk_range(start_key..=end_key)
341                                {
342                                    for (block_addr, storage_entry) in walker.flatten() {
343                                        if block_addr.address() == address {
344                                            local_keys.insert(storage_entry.key);
345                                        }
346                                        entries_in_chunk += 1;
347                                    }
348                                }
349
350                                // Merge into global state
351                                collected_keys_ref.lock().extend(local_keys);
352                                *total_entries_ref.lock() += entries_in_chunk;
353
354                                info!(
355                                    target: "reth::cli",
356                                    thread_id,
357                                    chunk_start,
358                                    chunk_end,
359                                    entries_in_chunk,
360                                    "Thread completed chunk"
361                                );
362
363                                Ok::<_, eyre::Report>(())
364                            })??;
365                        }
366                    })
367                })
368                .collect();
369
370            for handle in handles {
371                handle.join().map_err(|_| eyre::eyre!("Thread panicked"))??;
372            }
373
374            Ok::<_, eyre::Report>(())
375        })?;
376
377        let final_keys = collected_keys.into_inner();
378        let total = *total_entries_scanned.lock();
379
380        info!(
381            target: "reth::cli",
382            address = %address,
383            total_entries = total,
384            unique_keys = final_keys.len(),
385            "Finished parallel MDBX changeset scan"
386        );
387
388        keys.extend(final_keys);
389        Ok(())
390    }
391
392    fn print_results(
393        &self,
394        address: Address,
395        block: Option<BlockNumber>,
396        account: Option<reth_primitives_traits::Account>,
397        storage: &[(alloy_primitives::B256, U256)],
398    ) {
399        match self.format {
400            OutputFormat::Table => {
401                println!("Account: {address}");
402                if let Some(b) = block {
403                    println!("Block: {b}");
404                } else {
405                    println!("Block: latest");
406                }
407                println!();
408
409                if let Some(acc) = account {
410                    println!("Nonce: {}", acc.nonce);
411                    println!("Balance: {} wei", acc.balance);
412                    if let Some(code_hash) = acc.bytecode_hash {
413                        println!("Code hash: {code_hash}");
414                    }
415                } else {
416                    println!("Account not found");
417                }
418
419                println!();
420                println!("Storage ({} slots):", storage.len());
421                println!("{:-<130}", "");
422                println!("{:<66} | {:<64}", "Slot", "Value");
423                println!("{:-<130}", "");
424                for (key, value) in storage {
425                    println!("{key} | {value:#066x}");
426                }
427            }
428            OutputFormat::Json => {
429                let output = serde_json::json!({
430                    "address": address.to_string(),
431                    "block": block,
432                    "account": account.map(|a| serde_json::json!({
433                        "nonce": a.nonce,
434                        "balance": a.balance.to_string(),
435                        "code_hash": a.bytecode_hash.map(|h| h.to_string()),
436                    })),
437                    "storage": storage.iter().map(|(k, v)| {
438                        serde_json::json!({
439                            "key": k.to_string(),
440                            "value": format!("{v:#066x}"),
441                        })
442                    }).collect::<Vec<_>>(),
443                });
444                println!("{}", serde_json::to_string_pretty(&output).unwrap());
445            }
446            OutputFormat::Csv => {
447                println!("slot,value");
448                for (key, value) in storage {
449                    println!("{key},{value:#066x}");
450                }
451            }
452        }
453    }
454}
455
456#[derive(Debug, Clone, Default, clap::ValueEnum)]
457pub enum OutputFormat {
458    #[default]
459    Table,
460    Json,
461    Csv,
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[test]
469    fn parse_state_args() {
470        let cmd = Command::try_parse_from([
471            "state",
472            "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
473            "--block",
474            "1000000",
475        ])
476        .unwrap();
477        assert_eq!(
478            cmd.address,
479            "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
480        );
481        assert_eq!(cmd.block, Some(1000000));
482    }
483
484    #[test]
485    fn parse_state_args_no_block() {
486        let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
487            .unwrap();
488        assert_eq!(cmd.block, None);
489    }
490}