Skip to main content

reth_cli_commands/db/
state.rs

1use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
2use clap::Parser;
3use parking_lot::Mutex;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbDupCursorRO},
6    database::Database,
7    tables,
8    transaction::DbTx,
9};
10use reth_db_common::DbTool;
11use reth_node_builder::NodeTypesWithDB;
12use reth_provider::providers::ProviderNodeTypes;
13use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
14use reth_tasks::spawn_scoped_os_thread;
15use std::{
16    collections::BTreeSet,
17    thread,
18    time::{Duration, Instant},
19};
20use tracing::{error, info};
21
22/// Log progress every 5 seconds
23const LOG_INTERVAL: Duration = Duration::from_secs(30);
24
25/// The arguments for the `reth db state` command
26#[derive(Parser, Debug)]
27pub struct Command {
28    /// The account address to get state for
29    address: Address,
30
31    /// Block number to query state at (uses current state if not provided)
32    #[arg(long, short)]
33    block: Option<BlockNumber>,
34
35    /// Maximum number of storage slots to display
36    #[arg(long, short, default_value = "100")]
37    limit: usize,
38
39    /// Output format (table, json, csv)
40    #[arg(long, short, default_value = "table")]
41    format: OutputFormat,
42}
43
44impl Command {
45    /// Execute `db state` command
46    pub fn execute<N: NodeTypesWithDB + ProviderNodeTypes>(
47        self,
48        tool: &DbTool<N>,
49    ) -> eyre::Result<()> {
50        let address = self.address;
51        let limit = self.limit;
52
53        if let Some(block) = self.block {
54            self.execute_historical(tool, address, block, limit)
55        } else {
56            self.execute_current(tool, address, limit)
57        }
58    }
59
60    fn execute_current<N: NodeTypesWithDB + ProviderNodeTypes>(
61        &self,
62        tool: &DbTool<N>,
63        address: Address,
64        limit: usize,
65    ) -> eyre::Result<()> {
66        let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
67
68        let entries = tool.provider_factory.db_ref().view(|tx| {
69            let (account, walker_entries) = if use_hashed_state {
70                let hashed_address = keccak256(address);
71                let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
72                let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
73                let walker = cursor.walk_dup(Some(hashed_address), None)?;
74                let mut entries = Vec::new();
75                let mut last_log = Instant::now();
76                for (idx, entry) in walker.enumerate() {
77                    let (_, storage_entry) = entry?;
78                    if storage_entry.value != U256::ZERO {
79                        entries.push((storage_entry.key, storage_entry.value));
80                    }
81                    if entries.len() >= limit {
82                        break;
83                    }
84                    if last_log.elapsed() >= LOG_INTERVAL {
85                        info!(
86                            target: "reth::cli",
87                            address = %address,
88                            slots_scanned = idx,
89                            "Scanning storage slots"
90                        );
91                        last_log = Instant::now();
92                    }
93                }
94                (account, entries)
95            } else {
96                // Get account info
97                let account = tx.get::<tables::PlainAccountState>(address)?;
98                // Get storage entries
99                let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
100                let walker = cursor.walk_dup(Some(address), None)?;
101                let mut entries = Vec::new();
102                let mut last_log = Instant::now();
103                for (idx, entry) in walker.enumerate() {
104                    let (_, storage_entry) = entry?;
105                    if storage_entry.value != U256::ZERO {
106                        entries.push((storage_entry.key, storage_entry.value));
107                    }
108                    if entries.len() >= limit {
109                        break;
110                    }
111                    if last_log.elapsed() >= LOG_INTERVAL {
112                        info!(
113                            target: "reth::cli",
114                            address = %address,
115                            slots_scanned = idx,
116                            "Scanning storage slots"
117                        );
118                        last_log = Instant::now();
119                    }
120                }
121                (account, entries)
122            };
123
124            Ok::<_, eyre::Report>((account, walker_entries))
125        })??;
126
127        let (account, storage_entries) = entries;
128
129        self.print_results(address, None, account, &storage_entries);
130
131        Ok(())
132    }
133
134    fn execute_historical<N: NodeTypesWithDB + ProviderNodeTypes>(
135        &self,
136        tool: &DbTool<N>,
137        address: Address,
138        block: BlockNumber,
139        limit: usize,
140    ) -> eyre::Result<()> {
141        let provider = tool.provider_factory.history_by_block_number(block)?;
142
143        // Get account info at that block
144        let account = provider.basic_account(&address)?;
145
146        // Check storage settings to determine where history is stored
147        let storage_settings = tool.provider_factory.cached_storage_settings();
148        let history_in_rocksdb = storage_settings.storage_v2;
149
150        // For historical queries, enumerate keys from history indices only
151        // (not PlainStorageState, which reflects current state)
152        let mut storage_keys = BTreeSet::new();
153
154        if history_in_rocksdb {
155            error!(
156                target: "reth::cli",
157                "Historical storage queries with RocksDB backend are not yet supported. \
158                 Use MDBX for storage history or query current state without --block."
159            );
160            return Ok(());
161        }
162
163        // Collect keys from MDBX StorageChangeSets using parallel scanning
164        self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
165
166        info!(
167            target: "reth::cli",
168            address = %address,
169            block = block,
170            total_keys = storage_keys.len(),
171            "Found storage keys to query"
172        );
173
174        // Now query each key at the historical block using the StateProvider
175        // This handles both MDBX and RocksDB backends transparently
176        let mut entries = Vec::new();
177        let mut last_log = Instant::now();
178
179        for (idx, key) in storage_keys.iter().enumerate() {
180            match provider.storage(address, *key) {
181                Ok(Some(value)) if value != U256::ZERO => {
182                    entries.push((*key, value));
183                }
184                _ => {}
185            }
186
187            if entries.len() >= limit {
188                break;
189            }
190
191            if last_log.elapsed() >= LOG_INTERVAL {
192                info!(
193                    target: "reth::cli",
194                    address = %address,
195                    block = block,
196                    keys_total = storage_keys.len(),
197                    slots_scanned = idx,
198                    slots_found = entries.len(),
199                    "Scanning historical storage slots"
200                );
201                last_log = Instant::now();
202            }
203        }
204
205        self.print_results(address, Some(block), account, &entries);
206
207        Ok(())
208    }
209
210    /// Collects storage keys from MDBX StorageChangeSets using parallel block range scanning.
211    fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
212        &self,
213        tool: &DbTool<N>,
214        address: Address,
215        keys: &mut BTreeSet<B256>,
216    ) -> eyre::Result<()> {
217        const CHUNK_SIZE: u64 = 500_000; // 500k blocks per thread
218        let num_threads = std::thread::available_parallelism()
219            .map(|p| p.get().saturating_sub(1).max(1))
220            .unwrap_or(4);
221
222        // Get the current tip block
223        let tip = tool.provider_factory.provider()?.best_block_number()?;
224
225        if tip == 0 {
226            return Ok(());
227        }
228
229        info!(
230            target: "reth::cli",
231            address = %address,
232            tip,
233            chunk_size = CHUNK_SIZE,
234            num_threads,
235            "Starting parallel MDBX changeset scan"
236        );
237
238        // Shared state for collecting keys
239        let collected_keys: Mutex<BTreeSet<B256>> = Mutex::new(BTreeSet::new());
240        let total_entries_scanned = Mutex::new(0usize);
241
242        // Create chunk ranges
243        let mut chunks: Vec<(u64, u64)> = Vec::new();
244        let mut start = 0u64;
245        while start <= tip {
246            let end = (start + CHUNK_SIZE - 1).min(tip);
247            chunks.push((start, end));
248            start = end + 1;
249        }
250
251        let chunks_ref = &chunks;
252        let next_chunk = Mutex::new(0usize);
253        let next_chunk_ref = &next_chunk;
254        let collected_keys_ref = &collected_keys;
255        let total_entries_ref = &total_entries_scanned;
256
257        thread::scope(|s| {
258            let handles: Vec<_> = (0..num_threads)
259                .map(|thread_id| {
260                    spawn_scoped_os_thread(s, "db-state-worker", move || {
261                        loop {
262                            // Get next chunk to process
263                            let chunk_idx = {
264                                let mut idx = next_chunk_ref.lock();
265                                if *idx >= chunks_ref.len() {
266                                    return Ok::<_, eyre::Report>(());
267                                }
268                                let current = *idx;
269                                *idx += 1;
270                                current
271                            };
272
273                            let (chunk_start, chunk_end) = chunks_ref[chunk_idx];
274
275                            // Open a new read transaction for this chunk
276                            tool.provider_factory.db_ref().view(|tx| {
277                                tx.disable_long_read_transaction_safety();
278
279                                let mut changeset_cursor =
280                                    tx.cursor_read::<tables::StorageChangeSets>()?;
281                                let start_key =
282                                    reth_db_api::models::BlockNumberAddress((chunk_start, address));
283                                let end_key =
284                                    reth_db_api::models::BlockNumberAddress((chunk_end, address));
285
286                                let mut local_keys = BTreeSet::new();
287                                let mut entries_in_chunk = 0usize;
288
289                                if let Ok(walker) = changeset_cursor.walk_range(start_key..=end_key)
290                                {
291                                    for (block_addr, storage_entry) in walker.flatten() {
292                                        if block_addr.address() == address {
293                                            local_keys.insert(storage_entry.key);
294                                        }
295                                        entries_in_chunk += 1;
296                                    }
297                                }
298
299                                // Merge into global state
300                                collected_keys_ref.lock().extend(local_keys);
301                                *total_entries_ref.lock() += entries_in_chunk;
302
303                                info!(
304                                    target: "reth::cli",
305                                    thread_id,
306                                    chunk_start,
307                                    chunk_end,
308                                    entries_in_chunk,
309                                    "Thread completed chunk"
310                                );
311
312                                Ok::<_, eyre::Report>(())
313                            })??;
314                        }
315                    })
316                })
317                .collect();
318
319            for handle in handles {
320                handle.join().map_err(|_| eyre::eyre!("Thread panicked"))??;
321            }
322
323            Ok::<_, eyre::Report>(())
324        })?;
325
326        let final_keys = collected_keys.into_inner();
327        let total = *total_entries_scanned.lock();
328
329        info!(
330            target: "reth::cli",
331            address = %address,
332            total_entries = total,
333            unique_keys = final_keys.len(),
334            "Finished parallel MDBX changeset scan"
335        );
336
337        keys.extend(final_keys);
338        Ok(())
339    }
340
341    fn print_results(
342        &self,
343        address: Address,
344        block: Option<BlockNumber>,
345        account: Option<reth_primitives_traits::Account>,
346        storage: &[(alloy_primitives::B256, U256)],
347    ) {
348        match self.format {
349            OutputFormat::Table => {
350                println!("Account: {address}");
351                if let Some(b) = block {
352                    println!("Block: {b}");
353                } else {
354                    println!("Block: latest");
355                }
356                println!();
357
358                if let Some(acc) = account {
359                    println!("Nonce: {}", acc.nonce);
360                    println!("Balance: {} wei", acc.balance);
361                    if let Some(code_hash) = acc.bytecode_hash {
362                        println!("Code hash: {code_hash}");
363                    }
364                } else {
365                    println!("Account not found");
366                }
367
368                println!();
369                println!("Storage ({} slots):", storage.len());
370                println!("{:-<130}", "");
371                println!("{:<66} | {:<64}", "Slot", "Value");
372                println!("{:-<130}", "");
373                for (key, value) in storage {
374                    println!("{key} | {value:#066x}");
375                }
376            }
377            OutputFormat::Json => {
378                let output = serde_json::json!({
379                    "address": address.to_string(),
380                    "block": block,
381                    "account": account.map(|a| serde_json::json!({
382                        "nonce": a.nonce,
383                        "balance": a.balance.to_string(),
384                        "code_hash": a.bytecode_hash.map(|h| h.to_string()),
385                    })),
386                    "storage": storage.iter().map(|(k, v)| {
387                        serde_json::json!({
388                            "key": k.to_string(),
389                            "value": format!("{v:#066x}"),
390                        })
391                    }).collect::<Vec<_>>(),
392                });
393                println!("{}", serde_json::to_string_pretty(&output).unwrap());
394            }
395            OutputFormat::Csv => {
396                println!("slot,value");
397                for (key, value) in storage {
398                    println!("{key},{value:#066x}");
399                }
400            }
401        }
402    }
403}
404
405#[derive(Debug, Clone, Default, clap::ValueEnum)]
406pub enum OutputFormat {
407    #[default]
408    Table,
409    Json,
410    Csv,
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    #[test]
418    fn parse_state_args() {
419        let cmd = Command::try_parse_from([
420            "state",
421            "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
422            "--block",
423            "1000000",
424        ])
425        .unwrap();
426        assert_eq!(
427            cmd.address,
428            "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
429        );
430        assert_eq!(cmd.block, Some(1000000));
431    }
432
433    #[test]
434    fn parse_state_args_no_block() {
435        let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
436            .unwrap();
437        assert_eq!(cmd.block, None);
438    }
439}