1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 tables,
7 transaction::{DbTx, DbTxMut},
8 RawKey, RawTable, RawValue,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::Account;
12use reth_provider::{
13 AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache,
14};
15use reth_stages_api::{
16 AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
17 StageError, StageId, UnwindInput, UnwindOutput,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21 fmt::Debug,
22 ops::{Range, RangeInclusive},
23 sync::mpsc::{self, Receiver},
24};
25use tracing::*;
26
27const MAXIMUM_CHANNELS: usize = 10_000;
29
30const WORKER_CHUNK_SIZE: usize = 100;
32
33#[derive(Clone, Debug)]
36pub struct AccountHashingStage {
37 pub clean_threshold: u64,
40 pub commit_threshold: u64,
42 pub etl_config: EtlConfig,
44}
45
46impl AccountHashingStage {
47 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
49 Self {
50 clean_threshold: config.clean_threshold,
51 commit_threshold: config.commit_threshold,
52 etl_config,
53 }
54 }
55}
56
57#[cfg(any(test, feature = "test-utils"))]
58impl AccountHashingStage {
59 pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
65 provider: &reth_provider::DatabaseProvider<Tx, N>,
66 opts: SeedOpts,
67 ) -> Result<Vec<(alloy_primitives::Address, Account)>, StageError>
68 where
69 N::Primitives: reth_primitives_traits::NodePrimitives<
70 Block = reth_ethereum_primitives::Block,
71 BlockHeader = reth_primitives_traits::Header,
72 >,
73 {
74 use alloy_primitives::U256;
75 use reth_db_api::models::AccountBeforeTx;
76 use reth_provider::{BlockWriter, StaticFileProviderFactory, StaticFileWriter};
77 use reth_testing_utils::{
78 generators,
79 generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
80 };
81
82 let mut rng = generators::rng();
83
84 let blocks = random_block_range(
85 &mut rng,
86 opts.blocks.clone(),
87 BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
88 );
89
90 for block in blocks {
91 provider.insert_block(&block.try_recover().unwrap()).unwrap();
92 }
93 provider
94 .static_file_provider()
95 .latest_writer(reth_static_file_types::StaticFileSegment::Headers)
96 .unwrap()
97 .commit()
98 .unwrap();
99 let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
100 {
101 let mut account_cursor =
103 provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
104 accounts.sort_by_key(|a| a.0);
105 for (addr, acc) in &accounts {
106 account_cursor.append(*addr, acc)?;
107 }
108
109 let mut acc_changeset_cursor =
110 provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
111 for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
112 let Account { nonce, balance, .. } = acc;
113 let prev_acc = Account {
114 nonce: nonce - 1,
115 balance: balance - U256::from(1),
116 bytecode_hash: None,
117 };
118 let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
119 acc_changeset_cursor.append(t, &acc_before_tx)?;
120 }
121 }
122
123 Ok(accounts)
124 }
125}
126
127impl Default for AccountHashingStage {
128 fn default() -> Self {
129 Self {
130 clean_threshold: 500_000,
131 commit_threshold: 100_000,
132 etl_config: EtlConfig::default(),
133 }
134 }
135}
136
137impl<Provider> Stage<Provider> for AccountHashingStage
138where
139 Provider: DBProvider<Tx: DbTxMut>
140 + HashingWriter
141 + AccountExtReader
142 + StatsReader
143 + StorageSettingsCache,
144{
145 fn id(&self) -> StageId {
147 StageId::AccountHashing
148 }
149
150 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
156 if input.target_reached() {
157 return Ok(ExecOutput::done(input.checkpoint()))
158 }
159
160 if provider.cached_storage_settings().use_hashed_state() {
163 return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
164 }
165
166 let (from_block, to_block) = input.next_block_range().into_inner();
167
168 if to_block - from_block > self.clean_threshold || from_block == 1 {
173 let tx = provider.tx_ref();
174
175 tx.clear::<tables::HashedAccounts>()?;
177
178 let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
179 let mut collector =
180 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
181 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
182
183 for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
185 let (tx, rx) = mpsc::channel();
187 channels.push(rx);
188
189 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
190 rayon::spawn(move || {
192 for (address, account) in chunk {
193 let address = address.key().unwrap();
194 let _ = tx.send((RawKey::new(keccak256(address)), account));
195 }
196 });
197
198 if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
200 collect(&mut channels, &mut collector)?;
201 }
202 }
203
204 collect(&mut channels, &mut collector)?;
205
206 let mut hashed_account_cursor =
207 tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
208
209 let total_hashes = collector.len();
210 let interval = (total_hashes / 10).max(1);
211 for (index, item) in collector.iter()?.enumerate() {
212 if index > 0 && index.is_multiple_of(interval) {
213 info!(
214 target: "sync::stages::hashing_account",
215 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
216 "Inserting hashes"
217 );
218 }
219
220 let (key, value) = item?;
221 hashed_account_cursor
222 .append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
223 }
224 } else {
225 let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
228 let accounts = provider.basic_accounts(lists)?;
232 provider.insert_account_for_hashing(accounts)?;
234 }
235
236 let checkpoint = StageCheckpoint::new(input.target())
239 .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
240 progress: stage_checkpoint_progress(provider)?,
241 ..Default::default()
242 });
243
244 Ok(ExecOutput { checkpoint, done: true })
245 }
246
247 fn unwind(
249 &mut self,
250 provider: &Provider,
251 input: UnwindInput,
252 ) -> Result<UnwindOutput, StageError> {
253 let (range, unwind_progress, _) =
259 input.unwind_block_range_with_threshold(self.commit_threshold);
260
261 provider.unwind_account_hashing_range(range)?;
262
263 let mut stage_checkpoint =
264 input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
265
266 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
267
268 Ok(UnwindOutput {
269 checkpoint: StageCheckpoint::new(unwind_progress)
270 .with_account_hashing_stage_checkpoint(stage_checkpoint),
271 })
272 }
273}
274
275fn collect(
277 channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
278 collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
279) -> Result<(), StageError> {
280 for channel in channels.iter_mut() {
281 while let Ok((key, v)) = channel.recv() {
282 collector.insert(key, v)?;
283 }
284 }
285 info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
286 channels.clear();
287 Ok(())
288}
289
290#[derive(Clone, Debug)]
302pub struct SeedOpts {
303 pub blocks: RangeInclusive<u64>,
305 pub accounts: usize,
307 pub txs: Range<u8>,
309}
310
311fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
312 Ok(EntitiesCheckpoint {
313 processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
314 total: provider.count_entries::<tables::PlainAccountState>()? as u64,
315 })
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::test_utils::{
322 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
323 UnwindStageTestRunner,
324 };
325 use alloy_primitives::U256;
326 use assert_matches::assert_matches;
327 use reth_primitives_traits::Account;
328 use reth_provider::providers::StaticFileWriter;
329 use reth_stages_api::StageUnitCheckpoint;
330 use test_utils::*;
331
332 stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
333
334 #[tokio::test]
335 async fn execute_clean_account_hashing() {
336 let (previous_stage, stage_progress) = (20, 10);
337 let mut runner = AccountHashingTestRunner::default();
339 runner.set_clean_threshold(1);
340
341 let input = ExecInput {
342 target: Some(previous_stage),
343 checkpoint: Some(StageCheckpoint::new(stage_progress)),
344 };
345
346 runner.seed_execution(input).expect("failed to seed execution");
347
348 let rx = runner.execute(input);
349 let result = rx.await.unwrap();
350
351 assert_matches!(
352 result,
353 Ok(ExecOutput {
354 checkpoint: StageCheckpoint {
355 block_number,
356 stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
357 progress: EntitiesCheckpoint {
358 processed,
359 total,
360 },
361 ..
362 })),
363 },
364 done: true,
365 }) if block_number == previous_stage &&
366 processed == total &&
367 total == runner.db.count_entries::<tables::PlainAccountState>().unwrap() as u64
368 );
369
370 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
372 }
373
374 mod test_utils {
375 use super::*;
376 use crate::test_utils::TestStageDB;
377 use alloy_primitives::Address;
378 use reth_provider::DatabaseProviderFactory;
379
380 pub(crate) struct AccountHashingTestRunner {
381 pub(crate) db: TestStageDB,
382 commit_threshold: u64,
383 clean_threshold: u64,
384 etl_config: EtlConfig,
385 }
386
387 impl AccountHashingTestRunner {
388 pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
389 self.clean_threshold = threshold;
390 }
391
392 #[expect(dead_code)]
393 pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
394 self.commit_threshold = threshold;
395 }
396
397 pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
400 self.db.query(|tx| {
401 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
402 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
403
404 while let Some((address, account)) = acc_cursor.next()? {
405 let hashed_addr = keccak256(address);
406 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
407 assert_eq!(acc, account)
408 }
409 }
410 Ok(())
411 })?;
412
413 Ok(())
414 }
415
416 pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
419 self.db.query(|tx| {
420 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
421 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
422
423 while let Some((address, account)) = acc_cursor.next()? {
424 let Account { nonce, balance, .. } = account;
425 let old_acc = Account {
426 nonce: nonce - 1,
427 balance: balance - U256::from(1),
428 bytecode_hash: None,
429 };
430 let hashed_addr = keccak256(address);
431 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
432 assert_eq!(acc, old_acc)
433 }
434 }
435 Ok(())
436 })?;
437
438 Ok(())
439 }
440 }
441
442 impl Default for AccountHashingTestRunner {
443 fn default() -> Self {
444 Self {
445 db: TestStageDB::default(),
446 commit_threshold: 1000,
447 clean_threshold: 1000,
448 etl_config: EtlConfig::default(),
449 }
450 }
451 }
452
453 impl StageTestRunner for AccountHashingTestRunner {
454 type S = AccountHashingStage;
455
456 fn db(&self) -> &TestStageDB {
457 &self.db
458 }
459
460 fn stage(&self) -> Self::S {
461 Self::S {
462 commit_threshold: self.commit_threshold,
463 clean_threshold: self.clean_threshold,
464 etl_config: self.etl_config.clone(),
465 }
466 }
467 }
468
469 impl ExecuteStageTestRunner for AccountHashingTestRunner {
470 type Seed = Vec<(Address, Account)>;
471
472 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
473 let provider = self.db.factory.database_provider_rw()?;
474 let res = Ok(AccountHashingStage::seed(
475 &provider,
476 SeedOpts { blocks: 0..=input.target(), accounts: 10, txs: 0..3 },
477 )
478 .unwrap());
479 provider.commit().expect("failed to commit");
480 res
481 }
482
483 fn validate_execution(
484 &self,
485 input: ExecInput,
486 output: Option<ExecOutput>,
487 ) -> Result<(), TestRunnerError> {
488 if let Some(output) = output {
489 let start_block = input.next_block();
490 let end_block = output.checkpoint.block_number;
491 if start_block > end_block {
492 return Ok(())
493 }
494 }
495 self.check_hashed_accounts()
496 }
497 }
498
499 impl UnwindStageTestRunner for AccountHashingTestRunner {
500 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
501 self.check_old_hashed_accounts()
502 }
503 }
504 }
505}