1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 tables,
7 transaction::{DbTx, DbTxMut},
8 RawKey, RawTable, RawValue,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::Account;
12use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader};
13use reth_stages_api::{
14 AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
15 StageError, StageId, UnwindInput, UnwindOutput,
16};
17use reth_storage_errors::provider::ProviderResult;
18use std::{
19 fmt::Debug,
20 ops::{Range, RangeInclusive},
21 sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25const MAXIMUM_CHANNELS: usize = 10_000;
27
28const WORKER_CHUNK_SIZE: usize = 100;
30
31#[derive(Clone, Debug)]
34pub struct AccountHashingStage {
35 pub clean_threshold: u64,
38 pub commit_threshold: u64,
40 pub etl_config: EtlConfig,
42}
43
44impl AccountHashingStage {
45 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
47 Self {
48 clean_threshold: config.clean_threshold,
49 commit_threshold: config.commit_threshold,
50 etl_config,
51 }
52 }
53}
54
55#[cfg(any(test, feature = "test-utils"))]
56impl AccountHashingStage {
57 pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
63 provider: &reth_provider::DatabaseProvider<Tx, N>,
64 opts: SeedOpts,
65 ) -> Result<Vec<(alloy_primitives::Address, Account)>, StageError>
66 where
67 N::Primitives: reth_primitives_traits::FullNodePrimitives<
68 Block = reth_ethereum_primitives::Block,
69 BlockHeader = reth_primitives_traits::Header,
70 >,
71 {
72 use alloy_primitives::U256;
73 use reth_db_api::models::AccountBeforeTx;
74 use reth_provider::{StaticFileProviderFactory, StaticFileWriter};
75 use reth_testing_utils::{
76 generators,
77 generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
78 };
79
80 let mut rng = generators::rng();
81
82 let blocks = random_block_range(
83 &mut rng,
84 opts.blocks.clone(),
85 BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
86 );
87
88 for block in blocks {
89 provider.insert_historical_block(block.try_recover().unwrap()).unwrap();
90 }
91 provider
92 .static_file_provider()
93 .latest_writer(reth_static_file_types::StaticFileSegment::Headers)
94 .unwrap()
95 .commit()
96 .unwrap();
97 let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
98 {
99 let mut account_cursor =
101 provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
102 accounts.sort_by(|a, b| a.0.cmp(&b.0));
103 for (addr, acc) in &accounts {
104 account_cursor.append(*addr, acc)?;
105 }
106
107 let mut acc_changeset_cursor =
108 provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
109 for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
110 let Account { nonce, balance, .. } = acc;
111 let prev_acc = Account {
112 nonce: nonce - 1,
113 balance: balance - U256::from(1),
114 bytecode_hash: None,
115 };
116 let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
117 acc_changeset_cursor.append(t, &acc_before_tx)?;
118 }
119 }
120
121 Ok(accounts)
122 }
123}
124
125impl Default for AccountHashingStage {
126 fn default() -> Self {
127 Self {
128 clean_threshold: 500_000,
129 commit_threshold: 100_000,
130 etl_config: EtlConfig::default(),
131 }
132 }
133}
134
135impl<Provider> Stage<Provider> for AccountHashingStage
136where
137 Provider: DBProvider<Tx: DbTxMut> + HashingWriter + AccountExtReader + StatsReader,
138{
139 fn id(&self) -> StageId {
141 StageId::AccountHashing
142 }
143
144 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
146 if input.target_reached() {
147 return Ok(ExecOutput::done(input.checkpoint()))
148 }
149
150 let (from_block, to_block) = input.next_block_range().into_inner();
151
152 if to_block - from_block > self.clean_threshold || from_block == 1 {
157 let tx = provider.tx_ref();
158
159 tx.clear::<tables::HashedAccounts>()?;
161
162 let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
163 let mut collector =
164 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
165 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
166
167 for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
169 let (tx, rx) = mpsc::channel();
171 channels.push(rx);
172
173 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
174 rayon::spawn(move || {
176 for (address, account) in chunk {
177 let address = address.key().unwrap();
178 let _ = tx.send((RawKey::new(keccak256(address)), account));
179 }
180 });
181
182 if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
184 collect(&mut channels, &mut collector)?;
185 }
186 }
187
188 collect(&mut channels, &mut collector)?;
189
190 let mut hashed_account_cursor =
191 tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
192
193 let total_hashes = collector.len();
194 let interval = (total_hashes / 10).max(1);
195 for (index, item) in collector.iter()?.enumerate() {
196 if index > 0 && index % interval == 0 {
197 info!(
198 target: "sync::stages::hashing_account",
199 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
200 "Inserting hashes"
201 );
202 }
203
204 let (key, value) = item?;
205 hashed_account_cursor
206 .append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
207 }
208 } else {
209 let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
212 let accounts = provider.basic_accounts(lists)?;
216 provider.insert_account_for_hashing(accounts)?;
218 }
219
220 let checkpoint = StageCheckpoint::new(input.target())
223 .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
224 progress: stage_checkpoint_progress(provider)?,
225 ..Default::default()
226 });
227
228 Ok(ExecOutput { checkpoint, done: true })
229 }
230
231 fn unwind(
233 &mut self,
234 provider: &Provider,
235 input: UnwindInput,
236 ) -> Result<UnwindOutput, StageError> {
237 let (range, unwind_progress, _) =
238 input.unwind_block_range_with_threshold(self.commit_threshold);
239
240 provider.unwind_account_hashing_range(range)?;
242
243 let mut stage_checkpoint =
244 input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
245
246 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
247
248 Ok(UnwindOutput {
249 checkpoint: StageCheckpoint::new(unwind_progress)
250 .with_account_hashing_stage_checkpoint(stage_checkpoint),
251 })
252 }
253}
254
255fn collect(
257 channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
258 collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
259) -> Result<(), StageError> {
260 for channel in channels.iter_mut() {
261 while let Ok((key, v)) = channel.recv() {
262 collector.insert(key, v)?;
263 }
264 }
265 info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
266 channels.clear();
267 Ok(())
268}
269
270#[derive(Clone, Debug)]
282pub struct SeedOpts {
283 pub blocks: RangeInclusive<u64>,
285 pub accounts: usize,
287 pub txs: Range<u8>,
289}
290
291fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
292 Ok(EntitiesCheckpoint {
293 processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
294 total: provider.count_entries::<tables::PlainAccountState>()? as u64,
295 })
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::test_utils::{
302 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
303 UnwindStageTestRunner,
304 };
305 use alloy_primitives::U256;
306 use assert_matches::assert_matches;
307 use reth_primitives_traits::Account;
308 use reth_provider::providers::StaticFileWriter;
309 use reth_stages_api::StageUnitCheckpoint;
310 use test_utils::*;
311
312 stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
313
314 #[tokio::test]
315 async fn execute_clean_account_hashing() {
316 let (previous_stage, stage_progress) = (20, 10);
317 let mut runner = AccountHashingTestRunner::default();
319 runner.set_clean_threshold(1);
320
321 let input = ExecInput {
322 target: Some(previous_stage),
323 checkpoint: Some(StageCheckpoint::new(stage_progress)),
324 };
325
326 runner.seed_execution(input).expect("failed to seed execution");
327
328 let rx = runner.execute(input);
329 let result = rx.await.unwrap();
330
331 assert_matches!(
332 result,
333 Ok(ExecOutput {
334 checkpoint: StageCheckpoint {
335 block_number,
336 stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
337 progress: EntitiesCheckpoint {
338 processed,
339 total,
340 },
341 ..
342 })),
343 },
344 done: true,
345 }) if block_number == previous_stage &&
346 processed == total &&
347 total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
348 );
349
350 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
352 }
353
354 mod test_utils {
355 use super::*;
356 use crate::test_utils::TestStageDB;
357 use alloy_primitives::Address;
358 use reth_provider::DatabaseProviderFactory;
359
360 pub(crate) struct AccountHashingTestRunner {
361 pub(crate) db: TestStageDB,
362 commit_threshold: u64,
363 clean_threshold: u64,
364 etl_config: EtlConfig,
365 }
366
367 impl AccountHashingTestRunner {
368 pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
369 self.clean_threshold = threshold;
370 }
371
372 #[allow(dead_code)]
373 pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
374 self.commit_threshold = threshold;
375 }
376
377 pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
380 self.db.query(|tx| {
381 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
382 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
383
384 while let Some((address, account)) = acc_cursor.next()? {
385 let hashed_addr = keccak256(address);
386 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
387 assert_eq!(acc, account)
388 }
389 }
390 Ok(())
391 })?;
392
393 Ok(())
394 }
395
396 pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
399 self.db.query(|tx| {
400 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
401 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
402
403 while let Some((address, account)) = acc_cursor.next()? {
404 let Account { nonce, balance, .. } = account;
405 let old_acc = Account {
406 nonce: nonce - 1,
407 balance: balance - U256::from(1),
408 bytecode_hash: None,
409 };
410 let hashed_addr = keccak256(address);
411 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
412 assert_eq!(acc, old_acc)
413 }
414 }
415 Ok(())
416 })?;
417
418 Ok(())
419 }
420 }
421
422 impl Default for AccountHashingTestRunner {
423 fn default() -> Self {
424 Self {
425 db: TestStageDB::default(),
426 commit_threshold: 1000,
427 clean_threshold: 1000,
428 etl_config: EtlConfig::default(),
429 }
430 }
431 }
432
433 impl StageTestRunner for AccountHashingTestRunner {
434 type S = AccountHashingStage;
435
436 fn db(&self) -> &TestStageDB {
437 &self.db
438 }
439
440 fn stage(&self) -> Self::S {
441 Self::S {
442 commit_threshold: self.commit_threshold,
443 clean_threshold: self.clean_threshold,
444 etl_config: self.etl_config.clone(),
445 }
446 }
447 }
448
449 impl ExecuteStageTestRunner for AccountHashingTestRunner {
450 type Seed = Vec<(Address, Account)>;
451
452 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
453 let provider = self.db.factory.database_provider_rw()?;
454 let res = Ok(AccountHashingStage::seed(
455 &provider,
456 SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 },
457 )
458 .unwrap());
459 provider.commit().expect("failed to commit");
460 res
461 }
462
463 fn validate_execution(
464 &self,
465 input: ExecInput,
466 output: Option<ExecOutput>,
467 ) -> Result<(), TestRunnerError> {
468 if let Some(output) = output {
469 let start_block = input.next_block();
470 let end_block = output.checkpoint.block_number;
471 if start_block > end_block {
472 return Ok(())
473 }
474 }
475 self.check_hashed_accounts()
476 }
477 }
478
479 impl UnwindStageTestRunner for AccountHashingTestRunner {
480 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
481 self.check_old_hashed_accounts()
482 }
483 }
484 }
485}