1use alloy_primitives::{b256, bytes::BufMut, keccak256, Address, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRW},
6 models::{BlockNumberAddress, CompactU256},
7 table::Decompress,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16 StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20 fmt::Debug,
21 sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25const MAXIMUM_CHANNELS: usize = 10_000;
27
28const WORKER_CHUNK_SIZE: usize = 100;
30
31const HASHED_ZERO_ADDRESS: B256 =
33 b256!("0x5380c7b7ae81a58eb98d9c78de4a1fd7fd9535fc953ed2be602daaa41767312a");
34
35#[derive(Debug)]
38pub struct StorageHashingStage {
39 pub clean_threshold: u64,
42 pub commit_threshold: u64,
44 pub etl_config: EtlConfig,
46}
47
48impl StorageHashingStage {
49 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
51 Self {
52 clean_threshold: config.clean_threshold,
53 commit_threshold: config.commit_threshold,
54 etl_config,
55 }
56 }
57}
58
59impl Default for StorageHashingStage {
60 fn default() -> Self {
61 Self {
62 clean_threshold: 500_000,
63 commit_threshold: 100_000,
64 etl_config: EtlConfig::default(),
65 }
66 }
67}
68
69impl<Provider> Stage<Provider> for StorageHashingStage
70where
71 Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
72{
73 fn id(&self) -> StageId {
75 StageId::StorageHashing
76 }
77
78 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
80 let tx = provider.tx_ref();
81 if input.target_reached() {
82 return Ok(ExecOutput::done(input.checkpoint()))
83 }
84
85 let (from_block, to_block) = input.next_block_range().into_inner();
86
87 if to_block - from_block > self.clean_threshold || from_block == 1 {
92 tx.clear::<tables::HashedStorages>()?;
94
95 let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
96 let mut collector =
97 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
98 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
99
100 for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
101 let (tx, rx) = mpsc::channel();
103 channels.push(rx);
104
105 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
106 rayon::spawn(move || {
108 let (mut last_addr, mut hashed_addr) = (Address::ZERO, HASHED_ZERO_ADDRESS);
110 for (address, slot) in chunk {
111 if address != last_addr {
112 last_addr = address;
113 hashed_addr = keccak256(address);
114 }
115 let mut addr_key = Vec::with_capacity(64);
116 addr_key.put_slice(hashed_addr.as_slice());
117 addr_key.put_slice(keccak256(slot.key).as_slice());
118 let _ = tx.send((addr_key, CompactU256::from(slot.value)));
119 }
120 });
121
122 if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
124 collect(&mut channels, &mut collector)?;
125 }
126 }
127
128 collect(&mut channels, &mut collector)?;
129
130 let total_hashes = collector.len();
131 let interval = (total_hashes / 10).max(1);
132 let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
133 for (index, item) in collector.iter()?.enumerate() {
134 if index > 0 && index.is_multiple_of(interval) {
135 info!(
136 target: "sync::stages::hashing_storage",
137 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
138 "Inserting hashes"
139 );
140 }
141
142 let (addr_key, value) = item?;
143 cursor.append_dup(
144 B256::from_slice(&addr_key[..32]),
145 StorageEntry {
146 key: B256::from_slice(&addr_key[32..]),
147 value: CompactU256::decompress_owned(value)?.into(),
148 },
149 )?;
150 }
151 } else {
152 let lists = provider.changed_storages_with_range(from_block..=to_block)?;
155 let storages = provider.plain_state_storages(lists)?;
159 provider.insert_storage_for_hashing(storages)?;
160 }
161
162 let checkpoint = StageCheckpoint::new(input.target())
165 .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
166 progress: stage_checkpoint_progress(provider)?,
167 ..Default::default()
168 });
169
170 Ok(ExecOutput { checkpoint, done: true })
171 }
172
173 fn unwind(
175 &mut self,
176 provider: &Provider,
177 input: UnwindInput,
178 ) -> Result<UnwindOutput, StageError> {
179 let (range, unwind_progress, _) =
180 input.unwind_block_range_with_threshold(self.commit_threshold);
181
182 provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;
183
184 let mut stage_checkpoint =
185 input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
186
187 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
188
189 Ok(UnwindOutput {
190 checkpoint: StageCheckpoint::new(unwind_progress)
191 .with_storage_hashing_stage_checkpoint(stage_checkpoint),
192 })
193 }
194}
195
196fn collect(
198 channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
199 collector: &mut Collector<Vec<u8>, CompactU256>,
200) -> Result<(), StageError> {
201 for channel in channels.iter_mut() {
202 while let Ok((key, v)) = channel.recv() {
203 collector.insert(key, v)?;
204 }
205 }
206 info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
207 channels.clear();
208 Ok(())
209}
210
211fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
212 Ok(EntitiesCheckpoint {
213 processed: provider.count_entries::<tables::HashedStorages>()? as u64,
214 total: provider.count_entries::<tables::PlainStorageState>()? as u64,
215 })
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use crate::test_utils::{
222 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
223 TestStageDB, UnwindStageTestRunner,
224 };
225 use alloy_primitives::{Address, U256};
226 use assert_matches::assert_matches;
227 use rand::Rng;
228 use reth_db_api::{
229 cursor::{DbCursorRW, DbDupCursorRO},
230 models::StoredBlockBodyIndices,
231 };
232 use reth_ethereum_primitives::Block;
233 use reth_primitives_traits::SealedBlock;
234 use reth_provider::providers::StaticFileWriter;
235 use reth_testing_utils::generators::{
236 self, random_block_range, random_contract_account_range, BlockRangeParams,
237 };
238
239 stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
240
241 #[tokio::test]
243 async fn execute_clean_storage_hashing() {
244 let (previous_stage, stage_progress) = (500, 100);
245
246 let mut runner = StorageHashingTestRunner::default();
248
249 runner.set_clean_threshold(1);
251
252 runner.set_commit_threshold(1);
255
256 let mut input = ExecInput {
257 target: Some(previous_stage),
258 checkpoint: Some(StageCheckpoint::new(stage_progress)),
259 };
260
261 runner.seed_execution(input).expect("failed to seed execution");
262
263 loop {
264 if let Ok(result @ ExecOutput { checkpoint, done }) =
265 runner.execute(input).await.unwrap()
266 {
267 if !done {
268 let previous_checkpoint = input
269 .checkpoint
270 .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
271 .unwrap_or_default();
272 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
273 progress: EntitiesCheckpoint {
274 processed,
275 total,
276 },
277 ..
278 }) if processed == previous_checkpoint.progress.processed + 1 &&
279 total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
280
281 input.checkpoint = Some(checkpoint);
283 continue
284 }
285 assert_eq!(checkpoint.block_number, previous_stage);
286 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
287 progress: EntitiesCheckpoint {
288 processed,
289 total,
290 },
291 ..
292 }) if processed == total &&
293 total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
294
295 assert!(
297 runner.validate_execution(input, Some(result)).is_ok(),
298 "execution validation"
299 );
300
301 break
302 }
303 panic!("Failed execution");
304 }
305 }
306
307 struct StorageHashingTestRunner {
308 db: TestStageDB,
309 commit_threshold: u64,
310 clean_threshold: u64,
311 etl_config: EtlConfig,
312 }
313
314 impl Default for StorageHashingTestRunner {
315 fn default() -> Self {
316 Self {
317 db: TestStageDB::default(),
318 commit_threshold: 1000,
319 clean_threshold: 1000,
320 etl_config: EtlConfig::default(),
321 }
322 }
323 }
324
325 impl StageTestRunner for StorageHashingTestRunner {
326 type S = StorageHashingStage;
327
328 fn db(&self) -> &TestStageDB {
329 &self.db
330 }
331
332 fn stage(&self) -> Self::S {
333 Self::S {
334 commit_threshold: self.commit_threshold,
335 clean_threshold: self.clean_threshold,
336 etl_config: self.etl_config.clone(),
337 }
338 }
339 }
340
341 impl ExecuteStageTestRunner for StorageHashingTestRunner {
342 type Seed = Vec<SealedBlock<Block>>;
343
344 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
345 let stage_progress = input.next_block();
346 let end = input.target();
347 let mut rng = generators::rng();
348
349 let n_accounts = 31;
350 let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
351
352 let blocks = random_block_range(
353 &mut rng,
354 stage_progress..=end,
355 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
356 );
357
358 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
359
360 let iter = blocks.iter();
361 let mut next_tx_num = 0;
362 let mut first_tx_num = next_tx_num;
363 for progress in iter {
364 let block_number = progress.number;
366 self.db.commit(|tx| {
367 progress.body().transactions.iter().try_for_each(
368 |transaction| -> Result<(), reth_db::DatabaseError> {
369 tx.put::<tables::TransactionHashNumbers>(
370 *transaction.tx_hash(),
371 next_tx_num,
372 )?;
373 tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
374
375 let (addr, _) = accounts
376 .get_mut((rng.random::<u64>() % n_accounts) as usize)
377 .unwrap();
378
379 for _ in 0..2 {
380 let new_entry = StorageEntry {
381 key: keccak256([rng.random::<u8>()]),
382 value: U256::from(rng.random::<u8>() % 30 + 1),
383 };
384 self.insert_storage_entry(
385 tx,
386 (block_number, *addr).into(),
387 new_entry,
388 progress.number == stage_progress,
389 )?;
390 }
391
392 next_tx_num += 1;
393 Ok(())
394 },
395 )?;
396
397 let has_reward: bool = rng.random();
399 if has_reward {
400 self.insert_storage_entry(
401 tx,
402 (block_number, Address::random()).into(),
403 StorageEntry {
404 key: keccak256("mining"),
405 value: U256::from(rng.random::<u32>()),
406 },
407 progress.number == stage_progress,
408 )?;
409 }
410
411 let body = StoredBlockBodyIndices {
412 first_tx_num,
413 tx_count: progress.transaction_count() as u64,
414 };
415
416 first_tx_num = next_tx_num;
417
418 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
419 Ok(())
420 })?;
421 }
422
423 Ok(blocks)
424 }
425
426 fn validate_execution(
427 &self,
428 input: ExecInput,
429 output: Option<ExecOutput>,
430 ) -> Result<(), TestRunnerError> {
431 if let Some(output) = output {
432 let start_block = input.checkpoint().block_number + 1;
433 let end_block = output.checkpoint.block_number;
434 if start_block > end_block {
435 return Ok(())
436 }
437 }
438 self.check_hashed_storage()
439 }
440 }
441
442 impl UnwindStageTestRunner for StorageHashingTestRunner {
443 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
444 self.unwind_storage(input)?;
445 self.check_hashed_storage()
446 }
447 }
448
449 impl StorageHashingTestRunner {
450 fn set_clean_threshold(&mut self, threshold: u64) {
451 self.clean_threshold = threshold;
452 }
453
454 fn set_commit_threshold(&mut self, threshold: u64) {
455 self.commit_threshold = threshold;
456 }
457
458 fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
459 self.db
460 .query(|tx| {
461 let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
462 let mut hashed_storage_cursor =
463 tx.cursor_dup_read::<tables::HashedStorages>()?;
464
465 let mut expected = 0;
466
467 while let Some((address, entry)) = storage_cursor.next()? {
468 let key = keccak256(entry.key);
469 let got =
470 hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
471 assert_eq!(
472 got,
473 Some(StorageEntry { key, ..entry }),
474 "{expected}: {address:?}"
475 );
476 expected += 1;
477 }
478 let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
479
480 assert_eq!(count, expected);
481 Ok(())
482 })
483 .map_err(|e| e.into())
484 }
485
486 fn insert_storage_entry<TX: DbTxMut>(
487 &self,
488 tx: &TX,
489 bn_address: BlockNumberAddress,
490 entry: StorageEntry,
491 hash: bool,
492 ) -> Result<(), reth_db::DatabaseError> {
493 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
494 let prev_entry =
495 match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
496 Some(e) if e.key == entry.key => {
497 tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
498 .expect("failed to delete entry");
499 e
500 }
501 _ => StorageEntry { key: entry.key, value: U256::from(0) },
502 };
503 tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
504
505 if hash {
506 let hashed_address = keccak256(bn_address.address());
507 let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
508
509 if let Some(e) = tx
510 .cursor_dup_write::<tables::HashedStorages>()?
511 .seek_by_key_subkey(hashed_address, hashed_entry.key)?
512 .filter(|e| e.key == hashed_entry.key)
513 {
514 tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
515 .expect("failed to delete entry");
516 }
517
518 tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
519 }
520
521 tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
522 Ok(())
523 }
524
525 fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
526 tracing::debug!("unwinding storage...");
527 let target_block = input.unwind_to;
528 self.db.commit(|tx| {
529 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
530 let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
531
532 let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
533
534 while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
535 if bn_address.block_number() < target_block {
536 break
537 }
538
539 if storage_cursor
540 .seek_by_key_subkey(bn_address.address(), entry.key)?
541 .filter(|e| e.key == entry.key)
542 .is_some()
543 {
544 storage_cursor.delete_current()?;
545 }
546
547 if !entry.value.is_zero() {
548 storage_cursor.upsert(bn_address.address(), &entry)?;
549 }
550 }
551 Ok(())
552 })?;
553 Ok(())
554 }
555 }
556}