Skip to main content

reth_stages/stages/
hashing_account.rs

1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    tables,
7    transaction::{DbTx, DbTxMut},
8    RawKey, RawTable, RawValue,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::Account;
12use reth_provider::{
13    AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache,
14};
15use reth_stages_api::{
16    AccountHashingCheckpoint, BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage,
17    StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21    collections::BTreeSet,
22    fmt::Debug,
23    ops::{Range, RangeInclusive},
24    sync::mpsc::{self, Receiver},
25};
26use tracing::*;
27
28/// Maximum number of channels that can exist in memory.
29const MAXIMUM_CHANNELS: usize = 10_000;
30
31/// Maximum number of accounts to hash per rayon worker job.
32const WORKER_CHUNK_SIZE: usize = 100;
33
34/// Account hashing stage hashes plain account.
35/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
36#[derive(Clone, Debug)]
37pub struct AccountHashingStage {
38    /// The threshold (in number of blocks) for switching between incremental
39    /// hashing and full storage hashing.
40    pub clean_threshold: u64,
41    /// The maximum number of accounts to process before committing during unwind.
42    pub commit_threshold: u64,
43    /// The maximum number of changeset entries to process before committing. The stage commits
44    /// after either `commit_threshold` blocks or `commit_entries` entries, whichever comes first.
45    pub commit_entries: u64,
46    /// ETL configuration
47    pub etl_config: EtlConfig,
48}
49
50impl AccountHashingStage {
51    /// Create new instance of [`AccountHashingStage`].
52    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
53        Self {
54            clean_threshold: config.clean_threshold,
55            commit_threshold: config.commit_threshold,
56            commit_entries: config.commit_entries,
57            etl_config,
58        }
59    }
60}
61
62#[cfg(any(test, feature = "test-utils"))]
63impl AccountHashingStage {
64    /// Initializes the `PlainAccountState` table with `num_accounts` having some random state
65    /// at the target block, with `txs_range` transactions in each block.
66    ///
67    /// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the
68    /// account state in the `AccountChangeSets` table.
69    pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
70        provider: &reth_provider::DatabaseProvider<Tx, N>,
71        opts: SeedOpts,
72    ) -> Result<Vec<(alloy_primitives::Address, Account)>, StageError>
73    where
74        N::Primitives: reth_primitives_traits::NodePrimitives<
75            Block = reth_ethereum_primitives::Block,
76            BlockHeader = reth_primitives_traits::Header,
77        >,
78    {
79        use alloy_primitives::U256;
80        use reth_db_api::models::AccountBeforeTx;
81        use reth_provider::{BlockWriter, StaticFileProviderFactory, StaticFileWriter};
82        use reth_testing_utils::{
83            generators,
84            generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
85        };
86
87        let mut rng = generators::rng();
88
89        let blocks = random_block_range(
90            &mut rng,
91            opts.blocks.clone(),
92            BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
93        );
94
95        for block in blocks {
96            provider.insert_block(&block.try_recover().unwrap()).unwrap();
97        }
98        provider
99            .static_file_provider()
100            .latest_writer(reth_static_file_types::StaticFileSegment::Headers)
101            .unwrap()
102            .commit()
103            .unwrap();
104        let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
105        {
106            // Account State generator
107            let mut account_cursor =
108                provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
109            accounts.sort_by_key(|a| a.0);
110            for (addr, acc) in &accounts {
111                account_cursor.append(*addr, acc)?;
112            }
113
114            let mut acc_changeset_cursor =
115                provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
116            for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
117                let Account { nonce, balance, .. } = acc;
118                let prev_acc = Account {
119                    nonce: nonce - 1,
120                    balance: balance - U256::from(1),
121                    bytecode_hash: None,
122                };
123                let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
124                acc_changeset_cursor.append(t, &acc_before_tx)?;
125            }
126        }
127
128        Ok(accounts)
129    }
130}
131
132impl Default for AccountHashingStage {
133    fn default() -> Self {
134        Self {
135            clean_threshold: 500_000,
136            commit_threshold: 100_000,
137            commit_entries: 30_000_000,
138            etl_config: EtlConfig::default(),
139        }
140    }
141}
142
143impl<Provider> Stage<Provider> for AccountHashingStage
144where
145    Provider: DBProvider<Tx: DbTxMut>
146        + HashingWriter
147        + AccountExtReader
148        + StatsReader
149        + StorageSettingsCache,
150{
151    /// Return the id of the stage
152    fn id(&self) -> StageId {
153        StageId::AccountHashing
154    }
155
156    /// Execute the stage.
157    ///
158    /// When `use_hashed_state` is enabled, this stage is a no-op because the execution stage
159    /// writes directly to `HashedAccounts`. Otherwise, it hashes plain state to populate hashed
160    /// tables.
161    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
162        if input.target_reached() {
163            return Ok(ExecOutput::done(input.checkpoint()))
164        }
165
166        // If using hashed state as canonical, execution already writes to `HashedAccounts`,
167        // so this stage becomes a no-op.
168        if provider.cached_storage_settings().use_hashed_state() {
169            return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
170        }
171
172        // Use the total remaining range to decide clean vs incremental.
173        let total_range = input.target() - input.checkpoint().block_number;
174        let from_block = input.next_block();
175
176        if total_range > self.clean_threshold || from_block == 1 {
177            // if there are more blocks than threshold it is faster to go over Plain state and
178            // hash all accounts otherwise take changesets aggregate the sets and apply
179            // hashing to HashedAccounts table. Also, if we start from genesis, we need to
180            // hash from scratch, as genesis accounts are not in changeset.
181            let tx = provider.tx_ref();
182
183            // clear table, load all accounts and hash them
184            tx.clear::<tables::HashedAccounts>()?;
185
186            let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
187            let mut collector =
188                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
189            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
190
191            // channels used to return result of account hashing
192            for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
193                // An _unordered_ channel to receive results from a rayon job
194                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
195                let (tx, rx) = mpsc::sync_channel(chunk.len());
196                channels.push(rx);
197                // Spawn the hashing task onto the global rayon pool
198                rayon::spawn(move || {
199                    for (address, account) in chunk {
200                        let address = address.key().unwrap();
201                        let _ = tx.send((RawKey::new(keccak256(address)), account));
202                    }
203                });
204
205                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
206                if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
207                    collect(&mut channels, &mut collector)?;
208                }
209            }
210
211            collect(&mut channels, &mut collector)?;
212
213            let mut hashed_account_cursor =
214                tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
215
216            let total_hashes = collector.len();
217            let interval = (total_hashes / 10).max(1);
218            for (index, item) in collector.iter()?.enumerate() {
219                if index > 0 && index.is_multiple_of(interval) {
220                    info!(
221                        target: "sync::stages::hashing_account",
222                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
223                        "Inserting hashes"
224                    );
225                }
226
227                let (key, value) = item?;
228                hashed_account_cursor
229                    .append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
230            }
231
232            let checkpoint = StageCheckpoint::new(input.target())
233                .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
234                    progress: stage_checkpoint_progress(provider)?,
235                    ..Default::default()
236                });
237
238            Ok(ExecOutput { checkpoint, done: true })
239        } else {
240            // Stream changesets entry-by-entry, bounded by both block count
241            // (commit_threshold) and entry count (commit_entries), whichever comes first.
242            let BlockRangeOutput { block_range, is_final_range } =
243                input.next_block_range_with_threshold(self.commit_threshold);
244            let (from_block, to_block) = block_range.into_inner();
245
246            let tx = provider.tx_ref();
247            let mut changeset_cursor = tx.cursor_read::<tables::AccountChangeSets>()?;
248            let mut changed = BTreeSet::new();
249            let mut total_entries = 0u64;
250            let mut last_block = from_block;
251
252            for entry in changeset_cursor.walk_range(from_block..=to_block)? {
253                let (block_number, account_before) = entry?;
254
255                // Check the entry limit only at block boundaries so we never
256                // checkpoint mid-block (which would skip the remaining entries
257                // for that block on the next invocation).
258                if block_number != last_block && total_entries >= self.commit_entries {
259                    break;
260                }
261
262                last_block = block_number;
263                changed.insert(account_before.address);
264                total_entries += 1;
265            }
266
267            let accounts = provider.basic_accounts(changed)?;
268            provider.insert_account_for_hashing(accounts)?;
269
270            let exhausted = total_entries < self.commit_entries;
271            let done = exhausted && is_final_range;
272            let progress_block = if exhausted { to_block } else { last_block };
273
274            let checkpoint = StageCheckpoint::new(progress_block)
275                .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
276                    progress: stage_checkpoint_progress(provider)?,
277                    ..Default::default()
278                });
279
280            Ok(ExecOutput { checkpoint, done })
281        }
282    }
283
284    /// Unwind the stage.
285    fn unwind(
286        &mut self,
287        provider: &Provider,
288        input: UnwindInput,
289    ) -> Result<UnwindOutput, StageError> {
290        // NOTE: this runs in both v1 and v2 mode. In v2 mode, execution writes
291        // directly to `HashedAccounts`, but the unwind must still revert those
292        // entries here because `MerkleUnwind` runs after this stage (in unwind
293        // order) and needs `HashedAccounts` to reflect the target block state
294        // before it can verify the state root.
295        let (range, unwind_progress, _) =
296            input.unwind_block_range_with_threshold(self.commit_threshold);
297
298        provider.unwind_account_hashing_range(range)?;
299
300        let mut stage_checkpoint =
301            input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
302
303        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
304
305        Ok(UnwindOutput {
306            checkpoint: StageCheckpoint::new(unwind_progress)
307                .with_account_hashing_stage_checkpoint(stage_checkpoint),
308        })
309    }
310}
311
312/// Flushes channels hashes to ETL collector.
313fn collect(
314    channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
315    collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
316) -> Result<(), StageError> {
317    for channel in channels.iter_mut() {
318        while let Ok((key, v)) = channel.recv() {
319            collector.insert(key, v)?;
320        }
321    }
322    info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
323    channels.clear();
324    Ok(())
325}
326
327// TODO: Rewrite this
328/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
329/// in unit tests or benchmarks to generate an initial database state for running the
330/// stage.
331///
332/// In order to check the "full hashing" mode of the stage you want to generate more
333/// transitions than `AccountHashingStage.clean_threshold`. This requires:
334/// 1. Creating enough blocks so there's enough transactions to generate the required transition
335///    keys in the `BlockTransitionIndex` (which depends on the `TxTransitionIndex` internally)
336/// 2. Setting `blocks.len() > clean_threshold` so that there's enough diffs to actually take the
337///    2nd codepath
338#[derive(Clone, Debug)]
339pub struct SeedOpts {
340    /// The range of blocks to be generated
341    pub blocks: RangeInclusive<u64>,
342    /// The number of accounts to be generated
343    pub accounts: usize,
344    /// The range of transactions to be generated per block.
345    pub txs: Range<u8>,
346}
347
348fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
349    Ok(EntitiesCheckpoint {
350        processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
351        total: provider.count_entries::<tables::PlainAccountState>()? as u64,
352    })
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use crate::test_utils::{
359        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
360        UnwindStageTestRunner,
361    };
362    use alloy_primitives::U256;
363    use assert_matches::assert_matches;
364    use reth_primitives_traits::Account;
365    use reth_provider::providers::StaticFileWriter;
366    use reth_stages_api::StageUnitCheckpoint;
367    use test_utils::*;
368
369    stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
370
371    #[tokio::test]
372    async fn execute_clean_account_hashing() {
373        let (previous_stage, stage_progress) = (20, 10);
374        // Set up the runner
375        let mut runner = AccountHashingTestRunner::default();
376        runner.set_clean_threshold(1);
377
378        let input = ExecInput {
379            target: Some(previous_stage),
380            checkpoint: Some(StageCheckpoint::new(stage_progress)),
381        };
382
383        runner.seed_execution(input).expect("failed to seed execution");
384
385        let rx = runner.execute(input);
386        let result = rx.await.unwrap();
387
388        assert_matches!(
389            result,
390            Ok(ExecOutput {
391                checkpoint: StageCheckpoint {
392                    block_number,
393                    stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
394                        progress: EntitiesCheckpoint {
395                            processed,
396                            total,
397                        },
398                        ..
399                    })),
400                },
401                done: true,
402            }) if block_number == previous_stage &&
403                processed == total &&
404                total == runner.db.count_entries::<tables::PlainAccountState>().unwrap() as u64
405        );
406
407        // Validate the stage execution
408        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
409    }
410
411    mod test_utils {
412        use super::*;
413        use crate::test_utils::TestStageDB;
414        use alloy_primitives::Address;
415        use reth_provider::DatabaseProviderFactory;
416
417        pub(crate) struct AccountHashingTestRunner {
418            pub(crate) db: TestStageDB,
419            commit_threshold: u64,
420            clean_threshold: u64,
421            commit_entries: u64,
422            etl_config: EtlConfig,
423        }
424
425        impl AccountHashingTestRunner {
426            pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
427                self.clean_threshold = threshold;
428            }
429
430            #[expect(dead_code)]
431            pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
432                self.commit_threshold = threshold;
433            }
434
435            /// Iterates over `PlainAccount` table and checks that the accounts match the ones
436            /// in the `HashedAccounts` table
437            pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
438                self.db.query(|tx| {
439                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
440                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
441
442                    while let Some((address, account)) = acc_cursor.next()? {
443                        let hashed_addr = keccak256(address);
444                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
445                            assert_eq!(acc, account)
446                        }
447                    }
448                    Ok(())
449                })?;
450
451                Ok(())
452            }
453
454            /// Same as `check_hashed_accounts`, only that checks with the old account state,
455            /// namely, the same account with nonce - 1 and balance - 1.
456            pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
457                self.db.query(|tx| {
458                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
459                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
460
461                    while let Some((address, account)) = acc_cursor.next()? {
462                        let Account { nonce, balance, .. } = account;
463                        let old_acc = Account {
464                            nonce: nonce - 1,
465                            balance: balance - U256::from(1),
466                            bytecode_hash: None,
467                        };
468                        let hashed_addr = keccak256(address);
469                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
470                            assert_eq!(acc, old_acc)
471                        }
472                    }
473                    Ok(())
474                })?;
475
476                Ok(())
477            }
478        }
479
480        impl Default for AccountHashingTestRunner {
481            fn default() -> Self {
482                Self {
483                    db: TestStageDB::default(),
484                    commit_threshold: 1000,
485                    clean_threshold: 1000,
486                    commit_entries: u64::MAX,
487                    etl_config: EtlConfig::default(),
488                }
489            }
490        }
491
492        impl StageTestRunner for AccountHashingTestRunner {
493            type S = AccountHashingStage;
494
495            fn db(&self) -> &TestStageDB {
496                &self.db
497            }
498
499            fn stage(&self) -> Self::S {
500                Self::S {
501                    commit_threshold: self.commit_threshold,
502                    clean_threshold: self.clean_threshold,
503                    commit_entries: self.commit_entries,
504                    etl_config: self.etl_config.clone(),
505                }
506            }
507        }
508
509        impl ExecuteStageTestRunner for AccountHashingTestRunner {
510            type Seed = Vec<(Address, Account)>;
511
512            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
513                let provider = self.db.factory.database_provider_rw()?;
514                let res = Ok(AccountHashingStage::seed(
515                    &provider,
516                    SeedOpts { blocks: 0..=input.target(), accounts: 10, txs: 0..3 },
517                )
518                .unwrap());
519                provider.commit().expect("failed to commit");
520                res
521            }
522
523            fn validate_execution(
524                &self,
525                input: ExecInput,
526                output: Option<ExecOutput>,
527            ) -> Result<(), TestRunnerError> {
528                if let Some(output) = output {
529                    let start_block = input.next_block();
530                    let end_block = output.checkpoint.block_number;
531                    if start_block > end_block {
532                        return Ok(())
533                    }
534                }
535                self.check_hashed_accounts()
536            }
537        }
538
539        impl UnwindStageTestRunner for AccountHashingTestRunner {
540            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
541                self.check_old_hashed_accounts()
542            }
543        }
544    }
545}