1use alloy_primitives::{b256, bytes::BufMut, keccak256, Address, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRW},
6 models::{BlockNumberAddress, CompactU256},
7 table::Decompress,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15 BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
16 StageError, StageId, StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_api::StorageSettingsCache;
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21 collections::{BTreeMap, BTreeSet},
22 fmt::Debug,
23 sync::mpsc::{self, Receiver},
24};
25use tracing::*;
26
27const MAXIMUM_CHANNELS: usize = 10_000;
29
30const WORKER_CHUNK_SIZE: usize = 100;
32
33const HASHED_ZERO_ADDRESS: B256 =
35 b256!("0x5380c7b7ae81a58eb98d9c78de4a1fd7fd9535fc953ed2be602daaa41767312a");
36
37#[derive(Debug)]
40pub struct StorageHashingStage {
41 pub clean_threshold: u64,
44 pub commit_threshold: u64,
46 pub commit_entries: u64,
49 pub etl_config: EtlConfig,
51}
52
53impl StorageHashingStage {
54 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
56 Self {
57 clean_threshold: config.clean_threshold,
58 commit_threshold: config.commit_threshold,
59 commit_entries: config.commit_entries,
60 etl_config,
61 }
62 }
63}
64
65impl Default for StorageHashingStage {
66 fn default() -> Self {
67 Self {
68 clean_threshold: 500_000,
69 commit_threshold: 100_000,
70 commit_entries: 30_000_000,
71 etl_config: EtlConfig::default(),
72 }
73 }
74}
75
76impl<Provider> Stage<Provider> for StorageHashingStage
77where
78 Provider: DBProvider<Tx: DbTxMut>
79 + StorageReader
80 + HashingWriter
81 + StatsReader
82 + StorageSettingsCache,
83{
84 fn id(&self) -> StageId {
86 StageId::StorageHashing
87 }
88
89 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
91 let tx = provider.tx_ref();
92 if input.target_reached() {
93 return Ok(ExecOutput::done(input.checkpoint()))
94 }
95
96 if provider.cached_storage_settings().use_hashed_state() {
99 return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
100 }
101
102 let total_range = input.target() - input.checkpoint().block_number;
104 let from_block = input.next_block();
105
106 if total_range > self.clean_threshold || from_block == 1 {
107 tx.clear::<tables::HashedStorages>()?;
115
116 let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
117 let mut collector =
118 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
119 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
120
121 for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
122 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
124 let (tx, rx) = mpsc::sync_channel(chunk.len());
125 channels.push(rx);
126 rayon::spawn(move || {
128 let (mut last_addr, mut hashed_addr) = (Address::ZERO, HASHED_ZERO_ADDRESS);
130 for (address, slot) in chunk {
131 if address != last_addr {
132 last_addr = address;
133 hashed_addr = keccak256(address);
134 }
135 let mut addr_key = Vec::with_capacity(64);
136 addr_key.put_slice(hashed_addr.as_slice());
137 addr_key.put_slice(keccak256(slot.key).as_slice());
138 let _ = tx.send((addr_key, CompactU256::from(slot.value)));
139 }
140 });
141
142 if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
144 collect(&mut channels, &mut collector)?;
145 }
146 }
147
148 collect(&mut channels, &mut collector)?;
149
150 let total_hashes = collector.len();
151 let interval = (total_hashes / 10).max(1);
152 let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
153 for (index, item) in collector.iter()?.enumerate() {
154 if index > 0 && index.is_multiple_of(interval) {
155 info!(
156 target: "sync::stages::hashing_storage",
157 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
158 "Inserting hashes"
159 );
160 }
161
162 let (addr_key, value) = item?;
163 cursor.append_dup(
164 B256::from_slice(&addr_key[..32]),
165 StorageEntry {
166 key: B256::from_slice(&addr_key[32..]),
167 value: CompactU256::decompress_owned(value)?.into(),
168 },
169 )?;
170 }
171
172 let checkpoint = StageCheckpoint::new(input.target())
173 .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
174 progress: stage_checkpoint_progress(provider)?,
175 ..Default::default()
176 });
177
178 Ok(ExecOutput { checkpoint, done: true })
179 } else {
180 let BlockRangeOutput { block_range, is_final_range } =
183 input.next_block_range_with_threshold(self.commit_threshold);
184 let (from_block, to_block) = block_range.into_inner();
185
186 let mut changeset_cursor = tx.cursor_read::<tables::StorageChangeSets>()?;
187 let mut changed: BTreeMap<Address, BTreeSet<B256>> = BTreeMap::new();
188 let mut total_entries = 0u64;
189 let mut last_block = from_block;
190
191 for entry in
192 changeset_cursor.walk_range(BlockNumberAddress::range(from_block..=to_block))?
193 {
194 let (BlockNumberAddress((block_number, address)), storage_entry) = entry?;
195
196 if block_number != last_block && total_entries >= self.commit_entries {
200 break;
201 }
202
203 last_block = block_number;
204 changed.entry(address).or_default().insert(storage_entry.key);
205 total_entries += 1;
206 }
207
208 let storages = provider.plain_state_storages(changed)?;
209 provider.insert_storage_for_hashing(storages)?;
210
211 let exhausted = total_entries < self.commit_entries;
212 let done = exhausted && is_final_range;
213 let progress_block = if exhausted { to_block } else { last_block };
214
215 let checkpoint = StageCheckpoint::new(progress_block)
216 .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
217 progress: stage_checkpoint_progress(provider)?,
218 ..Default::default()
219 });
220
221 Ok(ExecOutput { checkpoint, done })
222 }
223 }
224
225 fn unwind(
227 &mut self,
228 provider: &Provider,
229 input: UnwindInput,
230 ) -> Result<UnwindOutput, StageError> {
231 let (range, unwind_progress, _) =
237 input.unwind_block_range_with_threshold(self.commit_threshold);
238
239 provider.unwind_storage_hashing_range(range)?;
240
241 let mut stage_checkpoint =
242 input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
243
244 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
245
246 Ok(UnwindOutput {
247 checkpoint: StageCheckpoint::new(unwind_progress)
248 .with_storage_hashing_stage_checkpoint(stage_checkpoint),
249 })
250 }
251}
252
253fn collect(
255 channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
256 collector: &mut Collector<Vec<u8>, CompactU256>,
257) -> Result<(), StageError> {
258 for channel in channels.iter_mut() {
259 while let Ok((key, v)) = channel.recv() {
260 collector.insert(key, v)?;
261 }
262 }
263 info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
264 channels.clear();
265 Ok(())
266}
267
268fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
269 Ok(EntitiesCheckpoint {
270 processed: provider.count_entries::<tables::HashedStorages>()? as u64,
271 total: provider.count_entries::<tables::PlainStorageState>()? as u64,
272 })
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use crate::test_utils::{
279 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
280 TestStageDB, UnwindStageTestRunner,
281 };
282 use alloy_primitives::{Address, U256};
283 use assert_matches::assert_matches;
284 use rand::Rng;
285 use reth_db_api::{
286 cursor::{DbCursorRW, DbDupCursorRO},
287 models::{BlockNumberAddress, StoredBlockBodyIndices},
288 };
289 use reth_ethereum_primitives::Block;
290 use reth_primitives_traits::SealedBlock;
291 use reth_provider::providers::StaticFileWriter;
292 use reth_testing_utils::generators::{
293 self, random_block_range, random_contract_account_range, BlockRangeParams,
294 };
295
296 stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
297
298 #[tokio::test]
300 async fn execute_clean_storage_hashing() {
301 let (previous_stage, stage_progress) = (500, 100);
302
303 let mut runner = StorageHashingTestRunner::default();
305
306 runner.set_clean_threshold(1);
308
309 runner.set_commit_threshold(1);
312
313 let mut input = ExecInput {
314 target: Some(previous_stage),
315 checkpoint: Some(StageCheckpoint::new(stage_progress)),
316 };
317
318 runner.seed_execution(input).expect("failed to seed execution");
319
320 loop {
321 if let Ok(result @ ExecOutput { checkpoint, done }) =
322 runner.execute(input).await.unwrap()
323 {
324 if !done {
325 let previous_checkpoint = input
326 .checkpoint
327 .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
328 .unwrap_or_default();
329 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
330 progress: EntitiesCheckpoint {
331 processed,
332 total,
333 },
334 ..
335 }) if processed == previous_checkpoint.progress.processed + 1 &&
336 total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
337
338 input.checkpoint = Some(checkpoint);
340 continue
341 }
342 assert_eq!(checkpoint.block_number, previous_stage);
343 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
344 progress: EntitiesCheckpoint {
345 processed,
346 total,
347 },
348 ..
349 }) if processed == total &&
350 total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
351
352 assert!(
354 runner.validate_execution(input, Some(result)).is_ok(),
355 "execution validation"
356 );
357
358 break
359 }
360 panic!("Failed execution");
361 }
362 }
363
364 struct StorageHashingTestRunner {
365 db: TestStageDB,
366 commit_threshold: u64,
367 clean_threshold: u64,
368 commit_entries: u64,
369 etl_config: EtlConfig,
370 }
371
372 impl Default for StorageHashingTestRunner {
373 fn default() -> Self {
374 Self {
375 db: TestStageDB::default(),
376 commit_threshold: 1000,
377 clean_threshold: 1000,
378 commit_entries: u64::MAX,
379 etl_config: EtlConfig::default(),
380 }
381 }
382 }
383
384 impl StageTestRunner for StorageHashingTestRunner {
385 type S = StorageHashingStage;
386
387 fn db(&self) -> &TestStageDB {
388 &self.db
389 }
390
391 fn stage(&self) -> Self::S {
392 Self::S {
393 commit_threshold: self.commit_threshold,
394 clean_threshold: self.clean_threshold,
395 commit_entries: self.commit_entries,
396 etl_config: self.etl_config.clone(),
397 }
398 }
399 }
400
401 impl ExecuteStageTestRunner for StorageHashingTestRunner {
402 type Seed = Vec<SealedBlock<Block>>;
403
404 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
405 let stage_progress = input.next_block();
406 let end = input.target();
407 let mut rng = generators::rng();
408
409 let n_accounts = 31;
410 let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
411
412 let blocks = random_block_range(
413 &mut rng,
414 stage_progress..=end,
415 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
416 );
417
418 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
419 let mut tx_hash_numbers = Vec::new();
420
421 let iter = blocks.iter();
422 let mut next_tx_num = 0;
423 let mut first_tx_num = next_tx_num;
424 for progress in iter {
425 let block_number = progress.number;
427 self.db.commit(|tx| {
428 progress.body().transactions.iter().try_for_each(
429 |transaction| -> Result<(), reth_db::DatabaseError> {
430 tx_hash_numbers.push((*transaction.tx_hash(), next_tx_num));
431 tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
432
433 let (addr, _) = accounts
434 .get_mut((rng.random::<u64>() % n_accounts) as usize)
435 .unwrap();
436
437 for _ in 0..2 {
438 let new_entry = StorageEntry {
439 key: keccak256([rng.random::<u8>()]),
440 value: U256::from(rng.random::<u8>() % 30 + 1),
441 };
442 self.insert_storage_entry(
443 tx,
444 (block_number, *addr).into(),
445 new_entry,
446 progress.number == stage_progress,
447 )?;
448 }
449
450 next_tx_num += 1;
451 Ok(())
452 },
453 )?;
454
455 let has_reward: bool = rng.random();
457 if has_reward {
458 self.insert_storage_entry(
459 tx,
460 (block_number, Address::random()).into(),
461 StorageEntry {
462 key: keccak256("mining"),
463 value: U256::from(rng.random::<u32>()),
464 },
465 progress.number == stage_progress,
466 )?;
467 }
468
469 let body = StoredBlockBodyIndices {
470 first_tx_num,
471 tx_count: progress.transaction_count() as u64,
472 };
473
474 first_tx_num = next_tx_num;
475
476 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
477 Ok(())
478 })?;
479 }
480 self.db.insert_tx_hash_numbers(tx_hash_numbers)?;
481
482 Ok(blocks)
483 }
484
485 fn validate_execution(
486 &self,
487 input: ExecInput,
488 output: Option<ExecOutput>,
489 ) -> Result<(), TestRunnerError> {
490 if let Some(output) = output {
491 let start_block = input.checkpoint().block_number + 1;
492 let end_block = output.checkpoint.block_number;
493 if start_block > end_block {
494 return Ok(())
495 }
496 }
497 self.check_hashed_storage()
498 }
499 }
500
501 impl UnwindStageTestRunner for StorageHashingTestRunner {
502 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
503 self.unwind_storage(input)?;
504 self.check_hashed_storage()
505 }
506 }
507
508 impl StorageHashingTestRunner {
509 fn set_clean_threshold(&mut self, threshold: u64) {
510 self.clean_threshold = threshold;
511 }
512
513 fn set_commit_threshold(&mut self, threshold: u64) {
514 self.commit_threshold = threshold;
515 }
516
517 fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
518 self.db
519 .query(|tx| {
520 let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
521 let mut hashed_storage_cursor =
522 tx.cursor_dup_read::<tables::HashedStorages>()?;
523
524 let mut expected = 0;
525
526 while let Some((address, entry)) = storage_cursor.next()? {
527 let key = keccak256(entry.key);
528 let got =
529 hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
530 assert_eq!(
531 got,
532 Some(StorageEntry { key, ..entry }),
533 "{expected}: {address:?}"
534 );
535 expected += 1;
536 }
537 let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
538
539 assert_eq!(count, expected);
540 Ok(())
541 })
542 .map_err(|e| e.into())
543 }
544
545 fn insert_storage_entry<TX: DbTxMut>(
546 &self,
547 tx: &TX,
548 bn_address: BlockNumberAddress,
549 entry: StorageEntry,
550 hash: bool,
551 ) -> Result<(), reth_db::DatabaseError> {
552 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
553 let prev_entry =
554 match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
555 Some(e) if e.key == entry.key => {
556 tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
557 .expect("failed to delete entry");
558 e
559 }
560 _ => StorageEntry { key: entry.key, value: U256::from(0) },
561 };
562 tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
563
564 if hash {
565 let hashed_address = keccak256(bn_address.address());
566 let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
567
568 if let Some(e) = tx
569 .cursor_dup_write::<tables::HashedStorages>()?
570 .seek_by_key_subkey(hashed_address, hashed_entry.key)?
571 .filter(|e| e.key == hashed_entry.key)
572 {
573 tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
574 .expect("failed to delete entry");
575 }
576
577 tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
578 }
579
580 tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
581 Ok(())
582 }
583
584 fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
585 tracing::debug!("unwinding storage...");
586 let target_block = input.unwind_to;
587 self.db.commit(|tx| {
588 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
589 let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
590
591 let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
592
593 while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
594 if bn_address.block_number() < target_block {
595 break
596 }
597
598 if storage_cursor
599 .seek_by_key_subkey(bn_address.address(), entry.key)?
600 .is_some_and(|e| e.key == entry.key)
601 {
602 storage_cursor.delete_current()?;
603 }
604
605 if !entry.value.is_zero() {
606 storage_cursor.upsert(bn_address.address(), &entry)?;
607 }
608 }
609 Ok(())
610 })?;
611 Ok(())
612 }
613 }
614}