1use alloy_primitives::{bytes::BufMut, keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbDupCursorRW},
6 models::{BlockNumberAddress, CompactU256},
7 table::Decompress,
8 tables,
9 transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16 StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20 fmt::Debug,
21 sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25const MAXIMUM_CHANNELS: usize = 10_000;
27
28const WORKER_CHUNK_SIZE: usize = 100;
30
31#[derive(Debug)]
34pub struct StorageHashingStage {
35 pub clean_threshold: u64,
38 pub commit_threshold: u64,
40 pub etl_config: EtlConfig,
42}
43
44impl StorageHashingStage {
45 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
47 Self {
48 clean_threshold: config.clean_threshold,
49 commit_threshold: config.commit_threshold,
50 etl_config,
51 }
52 }
53}
54
55impl Default for StorageHashingStage {
56 fn default() -> Self {
57 Self {
58 clean_threshold: 500_000,
59 commit_threshold: 100_000,
60 etl_config: EtlConfig::default(),
61 }
62 }
63}
64
65impl<Provider> Stage<Provider> for StorageHashingStage
66where
67 Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
68{
69 fn id(&self) -> StageId {
71 StageId::StorageHashing
72 }
73
74 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
76 let tx = provider.tx_ref();
77 if input.target_reached() {
78 return Ok(ExecOutput::done(input.checkpoint()))
79 }
80
81 let (from_block, to_block) = input.next_block_range().into_inner();
82
83 if to_block - from_block > self.clean_threshold || from_block == 1 {
88 tx.clear::<tables::HashedStorages>()?;
90
91 let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
92 let mut collector =
93 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
94 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
95
96 for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
97 let (tx, rx) = mpsc::channel();
99 channels.push(rx);
100
101 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
102 rayon::spawn(move || {
104 for (address, slot) in chunk {
105 let mut addr_key = Vec::with_capacity(64);
106 addr_key.put_slice(keccak256(address).as_slice());
107 addr_key.put_slice(keccak256(slot.key).as_slice());
108 let _ = tx.send((addr_key, CompactU256::from(slot.value)));
109 }
110 });
111
112 if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
114 collect(&mut channels, &mut collector)?;
115 }
116 }
117
118 collect(&mut channels, &mut collector)?;
119
120 let total_hashes = collector.len();
121 let interval = (total_hashes / 10).max(1);
122 let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
123 for (index, item) in collector.iter()?.enumerate() {
124 if index > 0 && index % interval == 0 {
125 info!(
126 target: "sync::stages::hashing_storage",
127 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
128 "Inserting hashes"
129 );
130 }
131
132 let (addr_key, value) = item?;
133 cursor.append_dup(
134 B256::from_slice(&addr_key[..32]),
135 StorageEntry {
136 key: B256::from_slice(&addr_key[32..]),
137 value: CompactU256::decompress_owned(value)?.into(),
138 },
139 )?;
140 }
141 } else {
142 let lists = provider.changed_storages_with_range(from_block..=to_block)?;
145 let storages = provider.plain_state_storages(lists)?;
149 provider.insert_storage_for_hashing(storages)?;
150 }
151
152 let checkpoint = StageCheckpoint::new(input.target())
155 .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
156 progress: stage_checkpoint_progress(provider)?,
157 ..Default::default()
158 });
159
160 Ok(ExecOutput { checkpoint, done: true })
161 }
162
163 fn unwind(
165 &mut self,
166 provider: &Provider,
167 input: UnwindInput,
168 ) -> Result<UnwindOutput, StageError> {
169 let (range, unwind_progress, _) =
170 input.unwind_block_range_with_threshold(self.commit_threshold);
171
172 provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;
173
174 let mut stage_checkpoint =
175 input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
176
177 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
178
179 Ok(UnwindOutput {
180 checkpoint: StageCheckpoint::new(unwind_progress)
181 .with_storage_hashing_stage_checkpoint(stage_checkpoint),
182 })
183 }
184}
185
186fn collect(
188 channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
189 collector: &mut Collector<Vec<u8>, CompactU256>,
190) -> Result<(), StageError> {
191 for channel in channels.iter_mut() {
192 while let Ok((key, v)) = channel.recv() {
193 collector.insert(key, v)?;
194 }
195 }
196 info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
197 channels.clear();
198 Ok(())
199}
200
201fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
202 Ok(EntitiesCheckpoint {
203 processed: provider.count_entries::<tables::HashedStorages>()? as u64,
204 total: provider.count_entries::<tables::PlainStorageState>()? as u64,
205 })
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::test_utils::{
212 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
213 TestStageDB, UnwindStageTestRunner,
214 };
215 use alloy_primitives::{Address, U256};
216 use assert_matches::assert_matches;
217 use rand::Rng;
218 use reth_db_api::{
219 cursor::{DbCursorRW, DbDupCursorRO},
220 models::StoredBlockBodyIndices,
221 };
222 use reth_ethereum_primitives::Block;
223 use reth_primitives_traits::SealedBlock;
224 use reth_provider::providers::StaticFileWriter;
225 use reth_testing_utils::generators::{
226 self, random_block_range, random_contract_account_range, BlockRangeParams,
227 };
228
229 stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
230
231 #[tokio::test]
233 async fn execute_clean_storage_hashing() {
234 let (previous_stage, stage_progress) = (500, 100);
235
236 let mut runner = StorageHashingTestRunner::default();
238
239 runner.set_clean_threshold(1);
241
242 runner.set_commit_threshold(1);
245
246 let mut input = ExecInput {
247 target: Some(previous_stage),
248 checkpoint: Some(StageCheckpoint::new(stage_progress)),
249 };
250
251 runner.seed_execution(input).expect("failed to seed execution");
252
253 loop {
254 if let Ok(result @ ExecOutput { checkpoint, done }) =
255 runner.execute(input).await.unwrap()
256 {
257 if !done {
258 let previous_checkpoint = input
259 .checkpoint
260 .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
261 .unwrap_or_default();
262 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
263 progress: EntitiesCheckpoint {
264 processed,
265 total,
266 },
267 ..
268 }) if processed == previous_checkpoint.progress.processed + 1 &&
269 total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
270
271 input.checkpoint = Some(checkpoint);
273 continue
274 }
275 assert_eq!(checkpoint.block_number, previous_stage);
276 assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
277 progress: EntitiesCheckpoint {
278 processed,
279 total,
280 },
281 ..
282 }) if processed == total &&
283 total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
284
285 assert!(
287 runner.validate_execution(input, Some(result)).is_ok(),
288 "execution validation"
289 );
290
291 break
292 }
293 panic!("Failed execution");
294 }
295 }
296
297 struct StorageHashingTestRunner {
298 db: TestStageDB,
299 commit_threshold: u64,
300 clean_threshold: u64,
301 etl_config: EtlConfig,
302 }
303
304 impl Default for StorageHashingTestRunner {
305 fn default() -> Self {
306 Self {
307 db: TestStageDB::default(),
308 commit_threshold: 1000,
309 clean_threshold: 1000,
310 etl_config: EtlConfig::default(),
311 }
312 }
313 }
314
315 impl StageTestRunner for StorageHashingTestRunner {
316 type S = StorageHashingStage;
317
318 fn db(&self) -> &TestStageDB {
319 &self.db
320 }
321
322 fn stage(&self) -> Self::S {
323 Self::S {
324 commit_threshold: self.commit_threshold,
325 clean_threshold: self.clean_threshold,
326 etl_config: self.etl_config.clone(),
327 }
328 }
329 }
330
331 impl ExecuteStageTestRunner for StorageHashingTestRunner {
332 type Seed = Vec<SealedBlock<Block>>;
333
334 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
335 let stage_progress = input.next_block();
336 let end = input.target();
337 let mut rng = generators::rng();
338
339 let n_accounts = 31;
340 let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
341
342 let blocks = random_block_range(
343 &mut rng,
344 stage_progress..=end,
345 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
346 );
347
348 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
349
350 let iter = blocks.iter();
351 let mut next_tx_num = 0;
352 let mut first_tx_num = next_tx_num;
353 for progress in iter {
354 let block_number = progress.number;
356 self.db.commit(|tx| {
357 progress.body().transactions.iter().try_for_each(
358 |transaction| -> Result<(), reth_db::DatabaseError> {
359 tx.put::<tables::TransactionHashNumbers>(
360 *transaction.tx_hash(),
361 next_tx_num,
362 )?;
363 tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
364
365 let (addr, _) = accounts
366 .get_mut((rng.random::<u64>() % n_accounts) as usize)
367 .unwrap();
368
369 for _ in 0..2 {
370 let new_entry = StorageEntry {
371 key: keccak256([rng.random::<u8>()]),
372 value: U256::from(rng.random::<u8>() % 30 + 1),
373 };
374 self.insert_storage_entry(
375 tx,
376 (block_number, *addr).into(),
377 new_entry,
378 progress.number == stage_progress,
379 )?;
380 }
381
382 next_tx_num += 1;
383 Ok(())
384 },
385 )?;
386
387 let has_reward: bool = rng.random();
389 if has_reward {
390 self.insert_storage_entry(
391 tx,
392 (block_number, Address::random()).into(),
393 StorageEntry {
394 key: keccak256("mining"),
395 value: U256::from(rng.random::<u32>()),
396 },
397 progress.number == stage_progress,
398 )?;
399 }
400
401 let body = StoredBlockBodyIndices {
402 first_tx_num,
403 tx_count: progress.transaction_count() as u64,
404 };
405
406 first_tx_num = next_tx_num;
407
408 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
409 Ok(())
410 })?;
411 }
412
413 Ok(blocks)
414 }
415
416 fn validate_execution(
417 &self,
418 input: ExecInput,
419 output: Option<ExecOutput>,
420 ) -> Result<(), TestRunnerError> {
421 if let Some(output) = output {
422 let start_block = input.checkpoint().block_number + 1;
423 let end_block = output.checkpoint.block_number;
424 if start_block > end_block {
425 return Ok(())
426 }
427 }
428 self.check_hashed_storage()
429 }
430 }
431
432 impl UnwindStageTestRunner for StorageHashingTestRunner {
433 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
434 self.unwind_storage(input)?;
435 self.check_hashed_storage()
436 }
437 }
438
439 impl StorageHashingTestRunner {
440 fn set_clean_threshold(&mut self, threshold: u64) {
441 self.clean_threshold = threshold;
442 }
443
444 fn set_commit_threshold(&mut self, threshold: u64) {
445 self.commit_threshold = threshold;
446 }
447
448 fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
449 self.db
450 .query(|tx| {
451 let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
452 let mut hashed_storage_cursor =
453 tx.cursor_dup_read::<tables::HashedStorages>()?;
454
455 let mut expected = 0;
456
457 while let Some((address, entry)) = storage_cursor.next()? {
458 let key = keccak256(entry.key);
459 let got =
460 hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
461 assert_eq!(
462 got,
463 Some(StorageEntry { key, ..entry }),
464 "{expected}: {address:?}"
465 );
466 expected += 1;
467 }
468 let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
469
470 assert_eq!(count, expected);
471 Ok(())
472 })
473 .map_err(|e| e.into())
474 }
475
476 fn insert_storage_entry<TX: DbTxMut>(
477 &self,
478 tx: &TX,
479 bn_address: BlockNumberAddress,
480 entry: StorageEntry,
481 hash: bool,
482 ) -> Result<(), reth_db::DatabaseError> {
483 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
484 let prev_entry =
485 match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
486 Some(e) if e.key == entry.key => {
487 tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
488 .expect("failed to delete entry");
489 e
490 }
491 _ => StorageEntry { key: entry.key, value: U256::from(0) },
492 };
493 tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
494
495 if hash {
496 let hashed_address = keccak256(bn_address.address());
497 let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
498
499 if let Some(e) = tx
500 .cursor_dup_write::<tables::HashedStorages>()?
501 .seek_by_key_subkey(hashed_address, hashed_entry.key)?
502 .filter(|e| e.key == hashed_entry.key)
503 {
504 tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
505 .expect("failed to delete entry");
506 }
507
508 tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
509 }
510
511 tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
512 Ok(())
513 }
514
515 fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
516 tracing::debug!("unwinding storage...");
517 let target_block = input.unwind_to;
518 self.db.commit(|tx| {
519 let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
520 let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
521
522 let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
523
524 while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
525 if bn_address.block_number() < target_block {
526 break
527 }
528
529 if storage_cursor
530 .seek_by_key_subkey(bn_address.address(), entry.key)?
531 .filter(|e| e.key == entry.key)
532 .is_some()
533 {
534 storage_cursor.delete_current()?;
535 }
536
537 if !entry.value.is_zero() {
538 storage_cursor.upsert(bn_address.address(), &entry)?;
539 }
540 }
541 Ok(())
542 })?;
543 Ok(())
544 }
545 }
546}