1use alloy_primitives::{b256, bytes::BufMut, keccak256, Address, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRW},
6 models::CompactU256,
7 table::Decompress,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16 StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_api::StorageSettingsCache;
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21 fmt::Debug,
22 sync::mpsc::{self, Receiver},
23};
24use tracing::*;
25
26const MAXIMUM_CHANNELS: usize = 10_000;
28
29const WORKER_CHUNK_SIZE: usize = 100;
31
32const HASHED_ZERO_ADDRESS: B256 =
34 b256!("0x5380c7b7ae81a58eb98d9c78de4a1fd7fd9535fc953ed2be602daaa41767312a");
35
36#[derive(Debug)]
39pub struct StorageHashingStage {
40 pub clean_threshold: u64,
43 pub commit_threshold: u64,
45 pub etl_config: EtlConfig,
47}
48
49impl StorageHashingStage {
50 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
52 Self {
53 clean_threshold: config.clean_threshold,
54 commit_threshold: config.commit_threshold,
55 etl_config,
56 }
57 }
58}
59
60impl Default for StorageHashingStage {
61 fn default() -> Self {
62 Self {
63 clean_threshold: 500_000,
64 commit_threshold: 100_000,
65 etl_config: EtlConfig::default(),
66 }
67 }
68}
69
70impl<Provider> Stage<Provider> for StorageHashingStage
71where
72 Provider: DBProvider<Tx: DbTxMut>
73 + StorageReader
74 + HashingWriter
75 + StatsReader
76 + StorageSettingsCache,
77{
78 fn id(&self) -> StageId {
80 StageId::StorageHashing
81 }
82
83 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
85 let tx = provider.tx_ref();
86 if input.target_reached() {
87 return Ok(ExecOutput::done(input.checkpoint()))
88 }
89
90 if provider.cached_storage_settings().use_hashed_state() {
93 return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
94 }
95
96 let (from_block, to_block) = input.next_block_range().into_inner();
97
98 if to_block - from_block > self.clean_threshold || from_block == 1 {
103 tx.clear::<tables::HashedStorages>()?;
105
106 let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
107 let mut collector =
108 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
109 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
110
111 for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
112 let (tx, rx) = mpsc::channel();
114 channels.push(rx);
115
116 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
117 rayon::spawn(move || {
119 let (mut last_addr, mut hashed_addr) = (Address::ZERO, HASHED_ZERO_ADDRESS);
121 for (address, slot) in chunk {
122 if address != last_addr {
123 last_addr = address;
124 hashed_addr = keccak256(address);
125 }
126 let mut addr_key = Vec::with_capacity(64);
127 addr_key.put_slice(hashed_addr.as_slice());
128 addr_key.put_slice(keccak256(slot.key).as_slice());
129 let _ = tx.send((addr_key, CompactU256::from(slot.value)));
130 }
131 });
132
133 if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
135 collect(&mut channels, &mut collector)?;
136 }
137 }
138
139 collect(&mut channels, &mut collector)?;
140
141 let total_hashes = collector.len();
142 let interval = (total_hashes / 10).max(1);
143 let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
144 for (index, item) in collector.iter()?.enumerate() {
145 if index > 0 && index.is_multiple_of(interval) {
146 info!(
147 target: "sync::stages::hashing_storage",
148 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
149 "Inserting hashes"
150 );
151 }
152
153 let (addr_key, value) = item?;
154 cursor.append_dup(
155 B256::from_slice(&addr_key[..32]),
156 StorageEntry {
157 key: B256::from_slice(&addr_key[32..]),
158 value: CompactU256::decompress_owned(value)?.into(),
159 },
160 )?;
161 }
162 } else {
163 let lists = provider.changed_storages_with_range(from_block..=to_block)?;
166 let storages = provider.plain_state_storages(lists)?;
170 provider.insert_storage_for_hashing(storages)?;
171 }
172
173 let checkpoint = StageCheckpoint::new(input.target())
176 .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
177 progress: stage_checkpoint_progress(provider)?,
178 ..Default::default()
179 });
180
181 Ok(ExecOutput { checkpoint, done: true })
182 }
183
184 fn unwind(
186 &mut self,
187 provider: &Provider,
188 input: UnwindInput,
189 ) -> Result<UnwindOutput, StageError> {
190 let (range, unwind_progress, _) =
196 input.unwind_block_range_with_threshold(self.commit_threshold);
197
198 provider.unwind_storage_hashing_range(range)?;
199
200 let mut stage_checkpoint =
201 input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
202
203 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
204
205 Ok(UnwindOutput {
206 checkpoint: StageCheckpoint::new(unwind_progress)
207 .with_storage_hashing_stage_checkpoint(stage_checkpoint),
208 })
209 }
210}
211
212fn collect(
214 channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
215 collector: &mut Collector<Vec<u8>, CompactU256>,
216) -> Result<(), StageError> {
217 for channel in channels.iter_mut() {
218 while let Ok((key, v)) = channel.recv() {
219 collector.insert(key, v)?;
220 }
221 }
222 info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
223 channels.clear();
224 Ok(())
225}
226
227fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
228 Ok(EntitiesCheckpoint {
229 processed: provider.count_entries::<tables::HashedStorages>()? as u64,
230 total: provider.count_entries::<tables::PlainStorageState>()? as u64,
231 })
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::test_utils::{
238 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
239 TestStageDB, UnwindStageTestRunner,
240 };
241 use alloy_primitives::{Address, U256};
242 use assert_matches::assert_matches;
243 use rand::Rng;
244 use reth_db_api::{
245 cursor::{DbCursorRW, DbDupCursorRO},
246 models::{BlockNumberAddress, StoredBlockBodyIndices},
247 };
248 use reth_ethereum_primitives::Block;
249 use reth_primitives_traits::SealedBlock;
250 use reth_provider::providers::StaticFileWriter;
251 use reth_testing_utils::generators::{
252 self, random_block_range, random_contract_account_range, BlockRangeParams,
253 };
254
255 stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
256
257 #[tokio::test]
259 async fn execute_clean_storage_hashing() {
260 let (previous_stage, stage_progress) = (500, 100);
261
262 let mut runner = StorageHashingTestRunner::default();
264
265 runner.set_clean_threshold(1);
267
268 runner.set_commit_threshold(1);
271
272 let mut input = ExecInput {
273 target: Some(previous_stage),
274 checkpoint: Some(StageCheckpoint::new(stage_progress)),
275 };
276
277 runner.seed_execution(input).expect("failed to seed execution");
278
279 loop {
280 if let Ok(result @ ExecOutput { checkpoint, done }) =
281 runner.execute(input).await.unwrap()
282 {
283 if !done {
284 let previous_checkpoint = input
285 .checkpoint
286 .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
287 .unwrap_or_default();
288 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
289 progress: EntitiesCheckpoint {
290 processed,
291 total,
292 },
293 ..
294 }) if processed == previous_checkpoint.progress.processed + 1 &&
295 total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
296
297 input.checkpoint = Some(checkpoint);
299 continue
300 }
301 assert_eq!(checkpoint.block_number, previous_stage);
302 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
303 progress: EntitiesCheckpoint {
304 processed,
305 total,
306 },
307 ..
308 }) if processed == total &&
309 total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
310
311 assert!(
313 runner.validate_execution(input, Some(result)).is_ok(),
314 "execution validation"
315 );
316
317 break
318 }
319 panic!("Failed execution");
320 }
321 }
322
323 struct StorageHashingTestRunner {
324 db: TestStageDB,
325 commit_threshold: u64,
326 clean_threshold: u64,
327 etl_config: EtlConfig,
328 }
329
330 impl Default for StorageHashingTestRunner {
331 fn default() -> Self {
332 Self {
333 db: TestStageDB::default(),
334 commit_threshold: 1000,
335 clean_threshold: 1000,
336 etl_config: EtlConfig::default(),
337 }
338 }
339 }
340
341 impl StageTestRunner for StorageHashingTestRunner {
342 type S = StorageHashingStage;
343
344 fn db(&self) -> &TestStageDB {
345 &self.db
346 }
347
348 fn stage(&self) -> Self::S {
349 Self::S {
350 commit_threshold: self.commit_threshold,
351 clean_threshold: self.clean_threshold,
352 etl_config: self.etl_config.clone(),
353 }
354 }
355 }
356
357 impl ExecuteStageTestRunner for StorageHashingTestRunner {
358 type Seed = Vec<SealedBlock<Block>>;
359
360 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
361 let stage_progress = input.next_block();
362 let end = input.target();
363 let mut rng = generators::rng();
364
365 let n_accounts = 31;
366 let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
367
368 let blocks = random_block_range(
369 &mut rng,
370 stage_progress..=end,
371 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
372 );
373
374 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
375 let mut tx_hash_numbers = Vec::new();
376
377 let iter = blocks.iter();
378 let mut next_tx_num = 0;
379 let mut first_tx_num = next_tx_num;
380 for progress in iter {
381 let block_number = progress.number;
383 self.db.commit(|tx| {
384 progress.body().transactions.iter().try_for_each(
385 |transaction| -> Result<(), reth_db::DatabaseError> {
386 tx_hash_numbers.push((*transaction.tx_hash(), next_tx_num));
387 tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
388
389 let (addr, _) = accounts
390 .get_mut((rng.random::<u64>() % n_accounts) as usize)
391 .unwrap();
392
393 for _ in 0..2 {
394 let new_entry = StorageEntry {
395 key: keccak256([rng.random::<u8>()]),
396 value: U256::from(rng.random::<u8>() % 30 + 1),
397 };
398 self.insert_storage_entry(
399 tx,
400 (block_number, *addr).into(),
401 new_entry,
402 progress.number == stage_progress,
403 )?;
404 }
405
406 next_tx_num += 1;
407 Ok(())
408 },
409 )?;
410
411 let has_reward: bool = rng.random();
413 if has_reward {
414 self.insert_storage_entry(
415 tx,
416 (block_number, Address::random()).into(),
417 StorageEntry {
418 key: keccak256("mining"),
419 value: U256::from(rng.random::<u32>()),
420 },
421 progress.number == stage_progress,
422 )?;
423 }
424
425 let body = StoredBlockBodyIndices {
426 first_tx_num,
427 tx_count: progress.transaction_count() as u64,
428 };
429
430 first_tx_num = next_tx_num;
431
432 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
433 Ok(())
434 })?;
435 }
436 self.db.insert_tx_hash_numbers(tx_hash_numbers)?;
437
438 Ok(blocks)
439 }
440
441 fn validate_execution(
442 &self,
443 input: ExecInput,
444 output: Option<ExecOutput>,
445 ) -> Result<(), TestRunnerError> {
446 if let Some(output) = output {
447 let start_block = input.checkpoint().block_number + 1;
448 let end_block = output.checkpoint.block_number;
449 if start_block > end_block {
450 return Ok(())
451 }
452 }
453 self.check_hashed_storage()
454 }
455 }
456
457 impl UnwindStageTestRunner for StorageHashingTestRunner {
458 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
459 self.unwind_storage(input)?;
460 self.check_hashed_storage()
461 }
462 }
463
464 impl StorageHashingTestRunner {
465 fn set_clean_threshold(&mut self, threshold: u64) {
466 self.clean_threshold = threshold;
467 }
468
469 fn set_commit_threshold(&mut self, threshold: u64) {
470 self.commit_threshold = threshold;
471 }
472
473 fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
474 self.db
475 .query(|tx| {
476 let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
477 let mut hashed_storage_cursor =
478 tx.cursor_dup_read::<tables::HashedStorages>()?;
479
480 let mut expected = 0;
481
482 while let Some((address, entry)) = storage_cursor.next()? {
483 let key = keccak256(entry.key);
484 let got =
485 hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
486 assert_eq!(
487 got,
488 Some(StorageEntry { key, ..entry }),
489 "{expected}: {address:?}"
490 );
491 expected += 1;
492 }
493 let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
494
495 assert_eq!(count, expected);
496 Ok(())
497 })
498 .map_err(|e| e.into())
499 }
500
501 fn insert_storage_entry<TX: DbTxMut>(
502 &self,
503 tx: &TX,
504 bn_address: BlockNumberAddress,
505 entry: StorageEntry,
506 hash: bool,
507 ) -> Result<(), reth_db::DatabaseError> {
508 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
509 let prev_entry =
510 match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
511 Some(e) if e.key == entry.key => {
512 tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
513 .expect("failed to delete entry");
514 e
515 }
516 _ => StorageEntry { key: entry.key, value: U256::from(0) },
517 };
518 tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
519
520 if hash {
521 let hashed_address = keccak256(bn_address.address());
522 let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
523
524 if let Some(e) = tx
525 .cursor_dup_write::<tables::HashedStorages>()?
526 .seek_by_key_subkey(hashed_address, hashed_entry.key)?
527 .filter(|e| e.key == hashed_entry.key)
528 {
529 tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
530 .expect("failed to delete entry");
531 }
532
533 tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
534 }
535
536 tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
537 Ok(())
538 }
539
540 fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
541 tracing::debug!("unwinding storage...");
542 let target_block = input.unwind_to;
543 self.db.commit(|tx| {
544 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
545 let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
546
547 let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
548
549 while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
550 if bn_address.block_number() < target_block {
551 break
552 }
553
554 if storage_cursor
555 .seek_by_key_subkey(bn_address.address(), entry.key)?
556 .filter(|e| e.key == entry.key)
557 .is_some()
558 {
559 storage_cursor.delete_current()?;
560 }
561
562 if !entry.value.is_zero() {
563 storage_cursor.upsert(bn_address.address(), &entry)?;
564 }
565 }
566 Ok(())
567 })?;
568 Ok(())
569 }
570 }
571}