1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 tables,
7 transaction::{DbTx, DbTxMut},
8 RawKey, RawTable, RawValue,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::Account;
12use reth_provider::{
13 AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache,
14};
15use reth_stages_api::{
16 AccountHashingCheckpoint, BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage,
17 StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21 collections::BTreeSet,
22 fmt::Debug,
23 ops::{Range, RangeInclusive},
24 sync::mpsc::{self, Receiver},
25};
26use tracing::*;
27
28const MAXIMUM_CHANNELS: usize = 10_000;
30
31const WORKER_CHUNK_SIZE: usize = 100;
33
34#[derive(Clone, Debug)]
37pub struct AccountHashingStage {
38 pub clean_threshold: u64,
41 pub commit_threshold: u64,
43 pub commit_entries: u64,
46 pub etl_config: EtlConfig,
48}
49
50impl AccountHashingStage {
51 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
53 Self {
54 clean_threshold: config.clean_threshold,
55 commit_threshold: config.commit_threshold,
56 commit_entries: config.commit_entries,
57 etl_config,
58 }
59 }
60}
61
62#[cfg(any(test, feature = "test-utils"))]
63impl AccountHashingStage {
64 pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
70 provider: &reth_provider::DatabaseProvider<Tx, N>,
71 opts: SeedOpts,
72 ) -> Result<Vec<(alloy_primitives::Address, Account)>, StageError>
73 where
74 N::Primitives: reth_primitives_traits::NodePrimitives<
75 Block = reth_ethereum_primitives::Block,
76 BlockHeader = reth_primitives_traits::Header,
77 >,
78 {
79 use alloy_primitives::U256;
80 use reth_db_api::models::AccountBeforeTx;
81 use reth_provider::{BlockWriter, StaticFileProviderFactory, StaticFileWriter};
82 use reth_testing_utils::{
83 generators,
84 generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
85 };
86
87 let mut rng = generators::rng();
88
89 let blocks = random_block_range(
90 &mut rng,
91 opts.blocks.clone(),
92 BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
93 );
94
95 for block in blocks {
96 provider.insert_block(&block.try_recover().unwrap()).unwrap();
97 }
98 provider
99 .static_file_provider()
100 .latest_writer(reth_static_file_types::StaticFileSegment::Headers)
101 .unwrap()
102 .commit()
103 .unwrap();
104 let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
105 {
106 let mut account_cursor =
108 provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
109 accounts.sort_by_key(|a| a.0);
110 for (addr, acc) in &accounts {
111 account_cursor.append(*addr, acc)?;
112 }
113
114 let mut acc_changeset_cursor =
115 provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
116 for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
117 let Account { nonce, balance, .. } = acc;
118 let prev_acc = Account {
119 nonce: nonce - 1,
120 balance: balance - U256::from(1),
121 bytecode_hash: None,
122 };
123 let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
124 acc_changeset_cursor.append(t, &acc_before_tx)?;
125 }
126 }
127
128 Ok(accounts)
129 }
130}
131
132impl Default for AccountHashingStage {
133 fn default() -> Self {
134 Self {
135 clean_threshold: 500_000,
136 commit_threshold: 100_000,
137 commit_entries: 30_000_000,
138 etl_config: EtlConfig::default(),
139 }
140 }
141}
142
143impl<Provider> Stage<Provider> for AccountHashingStage
144where
145 Provider: DBProvider<Tx: DbTxMut>
146 + HashingWriter
147 + AccountExtReader
148 + StatsReader
149 + StorageSettingsCache,
150{
151 fn id(&self) -> StageId {
153 StageId::AccountHashing
154 }
155
156 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
162 if input.target_reached() {
163 return Ok(ExecOutput::done(input.checkpoint()))
164 }
165
166 if provider.cached_storage_settings().use_hashed_state() {
169 return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
170 }
171
172 let total_range = input.target() - input.checkpoint().block_number;
174 let from_block = input.next_block();
175
176 if total_range > self.clean_threshold || from_block == 1 {
177 let tx = provider.tx_ref();
182
183 tx.clear::<tables::HashedAccounts>()?;
185
186 let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
187 let mut collector =
188 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
189 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
190
191 for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
193 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
195 let (tx, rx) = mpsc::sync_channel(chunk.len());
196 channels.push(rx);
197 rayon::spawn(move || {
199 for (address, account) in chunk {
200 let address = address.key().unwrap();
201 let _ = tx.send((RawKey::new(keccak256(address)), account));
202 }
203 });
204
205 if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
207 collect(&mut channels, &mut collector)?;
208 }
209 }
210
211 collect(&mut channels, &mut collector)?;
212
213 let mut hashed_account_cursor =
214 tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
215
216 let total_hashes = collector.len();
217 let interval = (total_hashes / 10).max(1);
218 for (index, item) in collector.iter()?.enumerate() {
219 if index > 0 && index.is_multiple_of(interval) {
220 info!(
221 target: "sync::stages::hashing_account",
222 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
223 "Inserting hashes"
224 );
225 }
226
227 let (key, value) = item?;
228 hashed_account_cursor
229 .append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
230 }
231
232 let checkpoint = StageCheckpoint::new(input.target())
233 .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
234 progress: stage_checkpoint_progress(provider)?,
235 ..Default::default()
236 });
237
238 Ok(ExecOutput { checkpoint, done: true })
239 } else {
240 let BlockRangeOutput { block_range, is_final_range } =
243 input.next_block_range_with_threshold(self.commit_threshold);
244 let (from_block, to_block) = block_range.into_inner();
245
246 let tx = provider.tx_ref();
247 let mut changeset_cursor = tx.cursor_read::<tables::AccountChangeSets>()?;
248 let mut changed = BTreeSet::new();
249 let mut total_entries = 0u64;
250 let mut last_block = from_block;
251
252 for entry in changeset_cursor.walk_range(from_block..=to_block)? {
253 let (block_number, account_before) = entry?;
254
255 if block_number != last_block && total_entries >= self.commit_entries {
259 break;
260 }
261
262 last_block = block_number;
263 changed.insert(account_before.address);
264 total_entries += 1;
265 }
266
267 let accounts = provider.basic_accounts(changed)?;
268 provider.insert_account_for_hashing(accounts)?;
269
270 let exhausted = total_entries < self.commit_entries;
271 let done = exhausted && is_final_range;
272 let progress_block = if exhausted { to_block } else { last_block };
273
274 let checkpoint = StageCheckpoint::new(progress_block)
275 .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
276 progress: stage_checkpoint_progress(provider)?,
277 ..Default::default()
278 });
279
280 Ok(ExecOutput { checkpoint, done })
281 }
282 }
283
284 fn unwind(
286 &mut self,
287 provider: &Provider,
288 input: UnwindInput,
289 ) -> Result<UnwindOutput, StageError> {
290 let (range, unwind_progress, _) =
296 input.unwind_block_range_with_threshold(self.commit_threshold);
297
298 provider.unwind_account_hashing_range(range)?;
299
300 let mut stage_checkpoint =
301 input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
302
303 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
304
305 Ok(UnwindOutput {
306 checkpoint: StageCheckpoint::new(unwind_progress)
307 .with_account_hashing_stage_checkpoint(stage_checkpoint),
308 })
309 }
310}
311
312fn collect(
314 channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
315 collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
316) -> Result<(), StageError> {
317 for channel in channels.iter_mut() {
318 while let Ok((key, v)) = channel.recv() {
319 collector.insert(key, v)?;
320 }
321 }
322 info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
323 channels.clear();
324 Ok(())
325}
326
327#[derive(Clone, Debug)]
339pub struct SeedOpts {
340 pub blocks: RangeInclusive<u64>,
342 pub accounts: usize,
344 pub txs: Range<u8>,
346}
347
348fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
349 Ok(EntitiesCheckpoint {
350 processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
351 total: provider.count_entries::<tables::PlainAccountState>()? as u64,
352 })
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use crate::test_utils::{
359 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
360 UnwindStageTestRunner,
361 };
362 use alloy_primitives::U256;
363 use assert_matches::assert_matches;
364 use reth_primitives_traits::Account;
365 use reth_provider::providers::StaticFileWriter;
366 use reth_stages_api::StageUnitCheckpoint;
367 use test_utils::*;
368
369 stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
370
371 #[tokio::test]
372 async fn execute_clean_account_hashing() {
373 let (previous_stage, stage_progress) = (20, 10);
374 let mut runner = AccountHashingTestRunner::default();
376 runner.set_clean_threshold(1);
377
378 let input = ExecInput {
379 target: Some(previous_stage),
380 checkpoint: Some(StageCheckpoint::new(stage_progress)),
381 };
382
383 runner.seed_execution(input).expect("failed to seed execution");
384
385 let rx = runner.execute(input);
386 let result = rx.await.unwrap();
387
388 assert_matches!(
389 result,
390 Ok(ExecOutput {
391 checkpoint: StageCheckpoint {
392 block_number,
393 stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
394 progress: EntitiesCheckpoint {
395 processed,
396 total,
397 },
398 ..
399 })),
400 },
401 done: true,
402 }) if block_number == previous_stage &&
403 processed == total &&
404 total == runner.db.count_entries::<tables::PlainAccountState>().unwrap() as u64
405 );
406
407 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
409 }
410
411 mod test_utils {
412 use super::*;
413 use crate::test_utils::TestStageDB;
414 use alloy_primitives::Address;
415 use reth_provider::DatabaseProviderFactory;
416
417 pub(crate) struct AccountHashingTestRunner {
418 pub(crate) db: TestStageDB,
419 commit_threshold: u64,
420 clean_threshold: u64,
421 commit_entries: u64,
422 etl_config: EtlConfig,
423 }
424
425 impl AccountHashingTestRunner {
426 pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
427 self.clean_threshold = threshold;
428 }
429
430 #[expect(dead_code)]
431 pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
432 self.commit_threshold = threshold;
433 }
434
435 pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
438 self.db.query(|tx| {
439 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
440 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
441
442 while let Some((address, account)) = acc_cursor.next()? {
443 let hashed_addr = keccak256(address);
444 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
445 assert_eq!(acc, account)
446 }
447 }
448 Ok(())
449 })?;
450
451 Ok(())
452 }
453
454 pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
457 self.db.query(|tx| {
458 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
459 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
460
461 while let Some((address, account)) = acc_cursor.next()? {
462 let Account { nonce, balance, .. } = account;
463 let old_acc = Account {
464 nonce: nonce - 1,
465 balance: balance - U256::from(1),
466 bytecode_hash: None,
467 };
468 let hashed_addr = keccak256(address);
469 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
470 assert_eq!(acc, old_acc)
471 }
472 }
473 Ok(())
474 })?;
475
476 Ok(())
477 }
478 }
479
480 impl Default for AccountHashingTestRunner {
481 fn default() -> Self {
482 Self {
483 db: TestStageDB::default(),
484 commit_threshold: 1000,
485 clean_threshold: 1000,
486 commit_entries: u64::MAX,
487 etl_config: EtlConfig::default(),
488 }
489 }
490 }
491
492 impl StageTestRunner for AccountHashingTestRunner {
493 type S = AccountHashingStage;
494
495 fn db(&self) -> &TestStageDB {
496 &self.db
497 }
498
499 fn stage(&self) -> Self::S {
500 Self::S {
501 commit_threshold: self.commit_threshold,
502 clean_threshold: self.clean_threshold,
503 commit_entries: self.commit_entries,
504 etl_config: self.etl_config.clone(),
505 }
506 }
507 }
508
509 impl ExecuteStageTestRunner for AccountHashingTestRunner {
510 type Seed = Vec<(Address, Account)>;
511
512 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
513 let provider = self.db.factory.database_provider_rw()?;
514 let res = Ok(AccountHashingStage::seed(
515 &provider,
516 SeedOpts { blocks: 0..=input.target(), accounts: 10, txs: 0..3 },
517 )
518 .unwrap());
519 provider.commit().expect("failed to commit");
520 res
521 }
522
523 fn validate_execution(
524 &self,
525 input: ExecInput,
526 output: Option<ExecOutput>,
527 ) -> Result<(), TestRunnerError> {
528 if let Some(output) = output {
529 let start_block = input.next_block();
530 let end_block = output.checkpoint.block_number;
531 if start_block > end_block {
532 return Ok(())
533 }
534 }
535 self.check_hashed_accounts()
536 }
537 }
538
539 impl UnwindStageTestRunner for AccountHashingTestRunner {
540 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
541 self.check_old_hashed_accounts()
542 }
543 }
544 }
545}