1use alloy_primitives::{bytes::BufMut, keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRW},
6 models::{BlockNumberAddress, CompactU256},
7 table::Decompress,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16 StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20 fmt::Debug,
21 sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25const MAXIMUM_CHANNELS: usize = 10_000;
27
28const WORKER_CHUNK_SIZE: usize = 100;
30
31#[derive(Debug)]
34pub struct StorageHashingStage {
35 pub clean_threshold: u64,
38 pub commit_threshold: u64,
40 pub etl_config: EtlConfig,
42}
43
44impl StorageHashingStage {
45 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
47 Self {
48 clean_threshold: config.clean_threshold,
49 commit_threshold: config.commit_threshold,
50 etl_config,
51 }
52 }
53}
54
55impl Default for StorageHashingStage {
56 fn default() -> Self {
57 Self {
58 clean_threshold: 500_000,
59 commit_threshold: 100_000,
60 etl_config: EtlConfig::default(),
61 }
62 }
63}
64
65impl<Provider> Stage<Provider> for StorageHashingStage
66where
67 Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
68{
69 fn id(&self) -> StageId {
71 StageId::StorageHashing
72 }
73
74 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
76 let tx = provider.tx_ref();
77 if input.target_reached() {
78 return Ok(ExecOutput::done(input.checkpoint()))
79 }
80
81 let (from_block, to_block) = input.next_block_range().into_inner();
82
83 if to_block - from_block > self.clean_threshold || from_block == 1 {
88 tx.clear::<tables::HashedStorages>()?;
90
91 let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
92 let mut collector =
93 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
94 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
95
96 for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
97 let (tx, rx) = mpsc::channel();
99 channels.push(rx);
100
101 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
102 rayon::spawn(move || {
104 for (address, slot) in chunk {
105 let mut addr_key = Vec::with_capacity(64);
106 addr_key.put_slice(keccak256(address).as_slice());
107 addr_key.put_slice(keccak256(slot.key).as_slice());
108 let _ = tx.send((addr_key, CompactU256::from(slot.value)));
109 }
110 });
111
112 if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
114 collect(&mut channels, &mut collector)?;
115 }
116 }
117
118 collect(&mut channels, &mut collector)?;
119
120 let total_hashes = collector.len();
121 let interval = (total_hashes / 10).max(1);
122 let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
123 for (index, item) in collector.iter()?.enumerate() {
124 if index > 0 && index % interval == 0 {
125 info!(
126 target: "sync::stages::hashing_storage",
127 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
128 "Inserting hashes"
129 );
130 }
131
132 let (addr_key, value) = item?;
133 cursor.append_dup(
134 B256::from_slice(&addr_key[..32]),
135 StorageEntry {
136 key: B256::from_slice(&addr_key[32..]),
137 value: CompactU256::decompress_owned(value)?.into(),
138 },
139 )?;
140 }
141 } else {
142 let lists = provider.changed_storages_with_range(from_block..=to_block)?;
145 let storages = provider.plain_state_storages(lists)?;
149 provider.insert_storage_for_hashing(storages)?;
150 }
151
152 let checkpoint = StageCheckpoint::new(input.target())
155 .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
156 progress: stage_checkpoint_progress(provider)?,
157 ..Default::default()
158 });
159
160 Ok(ExecOutput { checkpoint, done: true })
161 }
162
163 fn unwind(
165 &mut self,
166 provider: &Provider,
167 input: UnwindInput,
168 ) -> Result<UnwindOutput, StageError> {
169 let (range, unwind_progress, _) =
170 input.unwind_block_range_with_threshold(self.commit_threshold);
171
172 provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;
173
174 let mut stage_checkpoint =
175 input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
176
177 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
178
179 Ok(UnwindOutput {
180 checkpoint: StageCheckpoint::new(unwind_progress)
181 .with_storage_hashing_stage_checkpoint(stage_checkpoint),
182 })
183 }
184}
185
186fn collect(
188 channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
189 collector: &mut Collector<Vec<u8>, CompactU256>,
190) -> Result<(), StageError> {
191 for channel in channels.iter_mut() {
192 while let Ok((key, v)) = channel.recv() {
193 collector.insert(key, v)?;
194 }
195 }
196 info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
197 channels.clear();
198 Ok(())
199}
200
201fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
202 Ok(EntitiesCheckpoint {
203 processed: provider.count_entries::<tables::HashedStorages>()? as u64,
204 total: provider.count_entries::<tables::PlainStorageState>()? as u64,
205 })
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::test_utils::{
212 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
213 TestStageDB, UnwindStageTestRunner,
214 };
215 use alloy_primitives::{Address, U256};
216 use assert_matches::assert_matches;
217 use rand::Rng;
218 use reth_db_api::{
219 cursor::{DbCursorRW, DbDupCursorRO},
220 models::StoredBlockBodyIndices,
221 };
222 use reth_ethereum_primitives::Block;
223 use reth_primitives_traits::{SealedBlock, SignedTransaction};
224 use reth_provider::providers::StaticFileWriter;
225 use reth_testing_utils::generators::{
226 self, random_block_range, random_contract_account_range, BlockRangeParams,
227 };
228
229 stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
230
231 #[tokio::test]
233 async fn execute_clean_storage_hashing() {
234 let (previous_stage, stage_progress) = (500, 100);
235
236 let mut runner = StorageHashingTestRunner::default();
238
239 runner.set_clean_threshold(1);
241
242 runner.set_commit_threshold(1);
245
246 let mut input = ExecInput {
247 target: Some(previous_stage),
248 checkpoint: Some(StageCheckpoint::new(stage_progress)),
249 };
250
251 runner.seed_execution(input).expect("failed to seed execution");
252
253 loop {
254 if let Ok(result @ ExecOutput { checkpoint, done }) =
255 runner.execute(input).await.unwrap()
256 {
257 if !done {
258 let previous_checkpoint = input
259 .checkpoint
260 .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
261 .unwrap_or_default();
262 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
263 progress: EntitiesCheckpoint {
264 processed,
265 total,
266 },
267 ..
268 }) if processed == previous_checkpoint.progress.processed + 1 &&
269 total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
270
271 input.checkpoint = Some(checkpoint);
273 continue
274 }
275 assert_eq!(checkpoint.block_number, previous_stage);
276 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
277 progress: EntitiesCheckpoint {
278 processed,
279 total,
280 },
281 ..
282 }) if processed == total &&
283 total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
284
285 assert!(
287 runner.validate_execution(input, Some(result)).is_ok(),
288 "execution validation"
289 );
290
291 break
292 }
293 panic!("Failed execution");
294 }
295 }
296
297 struct StorageHashingTestRunner {
298 db: TestStageDB,
299 commit_threshold: u64,
300 clean_threshold: u64,
301 etl_config: EtlConfig,
302 }
303
304 impl Default for StorageHashingTestRunner {
305 fn default() -> Self {
306 Self {
307 db: TestStageDB::default(),
308 commit_threshold: 1000,
309 clean_threshold: 1000,
310 etl_config: EtlConfig::default(),
311 }
312 }
313 }
314
315 impl StageTestRunner for StorageHashingTestRunner {
316 type S = StorageHashingStage;
317
318 fn db(&self) -> &TestStageDB {
319 &self.db
320 }
321
322 fn stage(&self) -> Self::S {
323 Self::S {
324 commit_threshold: self.commit_threshold,
325 clean_threshold: self.clean_threshold,
326 etl_config: self.etl_config.clone(),
327 }
328 }
329 }
330
331 impl ExecuteStageTestRunner for StorageHashingTestRunner {
332 type Seed = Vec<SealedBlock<Block>>;
333
334 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
335 let stage_progress = input.next_block();
336 let end = input.target();
337 let mut rng = generators::rng();
338
339 let n_accounts = 31;
340 let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
341
342 let blocks = random_block_range(
343 &mut rng,
344 stage_progress..=end,
345 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
346 );
347
348 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
349
350 let iter = blocks.iter();
351 let mut next_tx_num = 0;
352 let mut first_tx_num = next_tx_num;
353 for progress in iter {
354 let block_number = progress.number;
356 self.db.commit(|tx| {
357 progress.body().transactions.iter().try_for_each(
358 |transaction| -> Result<(), reth_db::DatabaseError> {
359 tx.put::<tables::TransactionHashNumbers>(
360 *transaction.tx_hash(),
361 next_tx_num,
362 )?;
363 tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
364
365 let (addr, _) =
366 accounts.get_mut(rng.gen::<usize>() % n_accounts as usize).unwrap();
367
368 for _ in 0..2 {
369 let new_entry = StorageEntry {
370 key: keccak256([rng.gen::<u8>()]),
371 value: U256::from(rng.gen::<u8>() % 30 + 1),
372 };
373 self.insert_storage_entry(
374 tx,
375 (block_number, *addr).into(),
376 new_entry,
377 progress.number == stage_progress,
378 )?;
379 }
380
381 next_tx_num += 1;
382 Ok(())
383 },
384 )?;
385
386 let has_reward: bool = rng.gen();
388 if has_reward {
389 self.insert_storage_entry(
390 tx,
391 (block_number, Address::random()).into(),
392 StorageEntry {
393 key: keccak256("mining"),
394 value: U256::from(rng.gen::<u32>()),
395 },
396 progress.number == stage_progress,
397 )?;
398 }
399
400 let body = StoredBlockBodyIndices {
401 first_tx_num,
402 tx_count: progress.transaction_count() as u64,
403 };
404
405 first_tx_num = next_tx_num;
406
407 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
408 Ok(())
409 })?;
410 }
411
412 Ok(blocks)
413 }
414
415 fn validate_execution(
416 &self,
417 input: ExecInput,
418 output: Option<ExecOutput>,
419 ) -> Result<(), TestRunnerError> {
420 if let Some(output) = output {
421 let start_block = input.checkpoint().block_number + 1;
422 let end_block = output.checkpoint.block_number;
423 if start_block > end_block {
424 return Ok(())
425 }
426 }
427 self.check_hashed_storage()
428 }
429 }
430
431 impl UnwindStageTestRunner for StorageHashingTestRunner {
432 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
433 self.unwind_storage(input)?;
434 self.check_hashed_storage()
435 }
436 }
437
438 impl StorageHashingTestRunner {
439 fn set_clean_threshold(&mut self, threshold: u64) {
440 self.clean_threshold = threshold;
441 }
442
443 fn set_commit_threshold(&mut self, threshold: u64) {
444 self.commit_threshold = threshold;
445 }
446
447 fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
448 self.db
449 .query(|tx| {
450 let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
451 let mut hashed_storage_cursor =
452 tx.cursor_dup_read::<tables::HashedStorages>()?;
453
454 let mut expected = 0;
455
456 while let Some((address, entry)) = storage_cursor.next()? {
457 let key = keccak256(entry.key);
458 let got =
459 hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
460 assert_eq!(
461 got,
462 Some(StorageEntry { key, ..entry }),
463 "{expected}: {address:?}"
464 );
465 expected += 1;
466 }
467 let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
468
469 assert_eq!(count, expected);
470 Ok(())
471 })
472 .map_err(|e| e.into())
473 }
474
475 fn insert_storage_entry<TX: DbTxMut>(
476 &self,
477 tx: &TX,
478 bn_address: BlockNumberAddress,
479 entry: StorageEntry,
480 hash: bool,
481 ) -> Result<(), reth_db::DatabaseError> {
482 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
483 let prev_entry =
484 match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
485 Some(e) if e.key == entry.key => {
486 tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
487 .expect("failed to delete entry");
488 e
489 }
490 _ => StorageEntry { key: entry.key, value: U256::from(0) },
491 };
492 tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
493
494 if hash {
495 let hashed_address = keccak256(bn_address.address());
496 let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
497
498 if let Some(e) = tx
499 .cursor_dup_write::<tables::HashedStorages>()?
500 .seek_by_key_subkey(hashed_address, hashed_entry.key)?
501 .filter(|e| e.key == hashed_entry.key)
502 {
503 tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
504 .expect("failed to delete entry");
505 }
506
507 tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
508 }
509
510 tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
511 Ok(())
512 }
513
514 fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
515 tracing::debug!("unwinding storage...");
516 let target_block = input.unwind_to;
517 self.db.commit(|tx| {
518 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
519 let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
520
521 let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
522
523 while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
524 if bn_address.block_number() < target_block {
525 break
526 }
527
528 if storage_cursor
529 .seek_by_key_subkey(bn_address.address(), entry.key)?
530 .filter(|e| e.key == entry.key)
531 .is_some()
532 {
533 storage_cursor.delete_current()?;
534 }
535
536 if !entry.value.is_zero() {
537 storage_cursor.upsert(bn_address.address(), &entry)?;
538 }
539 }
540 Ok(())
541 })?;
542 Ok(())
543 }
544 }
545}