Skip to main content

reth_stages/stages/
hashing_account.rs

1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    tables,
7    transaction::{DbTx, DbTxMut},
8    RawKey, RawTable, RawValue,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::Account;
12use reth_provider::{
13    AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache,
14};
15use reth_stages_api::{
16    AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
17    StageError, StageId, UnwindInput, UnwindOutput,
18};
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21    fmt::Debug,
22    ops::{Range, RangeInclusive},
23    sync::mpsc::{self, Receiver},
24};
25use tracing::*;
26
27/// Maximum number of channels that can exist in memory.
28const MAXIMUM_CHANNELS: usize = 10_000;
29
30/// Maximum number of accounts to hash per rayon worker job.
31const WORKER_CHUNK_SIZE: usize = 100;
32
33/// Account hashing stage hashes plain account.
34/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
35#[derive(Clone, Debug)]
36pub struct AccountHashingStage {
37    /// The threshold (in number of blocks) for switching between incremental
38    /// hashing and full storage hashing.
39    pub clean_threshold: u64,
40    /// The maximum number of accounts to process before committing during unwind.
41    pub commit_threshold: u64,
42    /// ETL configuration
43    pub etl_config: EtlConfig,
44}
45
46impl AccountHashingStage {
47    /// Create new instance of [`AccountHashingStage`].
48    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
49        Self {
50            clean_threshold: config.clean_threshold,
51            commit_threshold: config.commit_threshold,
52            etl_config,
53        }
54    }
55}
56
57#[cfg(any(test, feature = "test-utils"))]
58impl AccountHashingStage {
59    /// Initializes the `PlainAccountState` table with `num_accounts` having some random state
60    /// at the target block, with `txs_range` transactions in each block.
61    ///
62    /// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the
63    /// account state in the `AccountChangeSets` table.
64    pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
65        provider: &reth_provider::DatabaseProvider<Tx, N>,
66        opts: SeedOpts,
67    ) -> Result<Vec<(alloy_primitives::Address, Account)>, StageError>
68    where
69        N::Primitives: reth_primitives_traits::NodePrimitives<
70            Block = reth_ethereum_primitives::Block,
71            BlockHeader = reth_primitives_traits::Header,
72        >,
73    {
74        use alloy_primitives::U256;
75        use reth_db_api::models::AccountBeforeTx;
76        use reth_provider::{BlockWriter, StaticFileProviderFactory, StaticFileWriter};
77        use reth_testing_utils::{
78            generators,
79            generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
80        };
81
82        let mut rng = generators::rng();
83
84        let blocks = random_block_range(
85            &mut rng,
86            opts.blocks.clone(),
87            BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
88        );
89
90        for block in blocks {
91            provider.insert_block(&block.try_recover().unwrap()).unwrap();
92        }
93        provider
94            .static_file_provider()
95            .latest_writer(reth_static_file_types::StaticFileSegment::Headers)
96            .unwrap()
97            .commit()
98            .unwrap();
99        let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
100        {
101            // Account State generator
102            let mut account_cursor =
103                provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
104            accounts.sort_by_key(|a| a.0);
105            for (addr, acc) in &accounts {
106                account_cursor.append(*addr, acc)?;
107            }
108
109            let mut acc_changeset_cursor =
110                provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
111            for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
112                let Account { nonce, balance, .. } = acc;
113                let prev_acc = Account {
114                    nonce: nonce - 1,
115                    balance: balance - U256::from(1),
116                    bytecode_hash: None,
117                };
118                let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
119                acc_changeset_cursor.append(t, &acc_before_tx)?;
120            }
121        }
122
123        Ok(accounts)
124    }
125}
126
127impl Default for AccountHashingStage {
128    fn default() -> Self {
129        Self {
130            clean_threshold: 500_000,
131            commit_threshold: 100_000,
132            etl_config: EtlConfig::default(),
133        }
134    }
135}
136
137impl<Provider> Stage<Provider> for AccountHashingStage
138where
139    Provider: DBProvider<Tx: DbTxMut>
140        + HashingWriter
141        + AccountExtReader
142        + StatsReader
143        + StorageSettingsCache,
144{
145    /// Return the id of the stage
146    fn id(&self) -> StageId {
147        StageId::AccountHashing
148    }
149
150    /// Execute the stage.
151    ///
152    /// When `use_hashed_state` is enabled, this stage is a no-op because the execution stage
153    /// writes directly to `HashedAccounts`. Otherwise, it hashes plain state to populate hashed
154    /// tables.
155    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
156        if input.target_reached() {
157            return Ok(ExecOutput::done(input.checkpoint()))
158        }
159
160        // If using hashed state as canonical, execution already writes to `HashedAccounts`,
161        // so this stage becomes a no-op.
162        if provider.cached_storage_settings().use_hashed_state() {
163            return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
164        }
165
166        let (from_block, to_block) = input.next_block_range().into_inner();
167
168        // if there are more blocks then threshold it is faster to go over Plain state and hash all
169        // account otherwise take changesets aggregate the sets and apply hashing to
170        // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
171        // genesis accounts are not in changeset.
172        if to_block - from_block > self.clean_threshold || from_block == 1 {
173            let tx = provider.tx_ref();
174
175            // clear table, load all accounts and hash it
176            tx.clear::<tables::HashedAccounts>()?;
177
178            let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
179            let mut collector =
180                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
181            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
182
183            // channels used to return result of account hashing
184            for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
185                // An _unordered_ channel to receive results from a rayon job
186                let (tx, rx) = mpsc::channel();
187                channels.push(rx);
188
189                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
190                // Spawn the hashing task onto the global rayon pool
191                rayon::spawn(move || {
192                    for (address, account) in chunk {
193                        let address = address.key().unwrap();
194                        let _ = tx.send((RawKey::new(keccak256(address)), account));
195                    }
196                });
197
198                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
199                if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
200                    collect(&mut channels, &mut collector)?;
201                }
202            }
203
204            collect(&mut channels, &mut collector)?;
205
206            let mut hashed_account_cursor =
207                tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
208
209            let total_hashes = collector.len();
210            let interval = (total_hashes / 10).max(1);
211            for (index, item) in collector.iter()?.enumerate() {
212                if index > 0 && index.is_multiple_of(interval) {
213                    info!(
214                        target: "sync::stages::hashing_account",
215                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
216                        "Inserting hashes"
217                    );
218                }
219
220                let (key, value) = item?;
221                hashed_account_cursor
222                    .append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
223            }
224        } else {
225            // Aggregate all transition changesets and make a list of accounts that have been
226            // changed.
227            let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
228            // Iterate over plain state and get newest value.
229            // Assumption we are okay to make is that plainstate represent
230            // `previous_stage_progress` state.
231            let accounts = provider.basic_accounts(lists)?;
232            // Insert and hash accounts to hashing table
233            provider.insert_account_for_hashing(accounts)?;
234        }
235
236        // We finished the hashing stage, no future iterations is expected for the same block range,
237        // so no checkpoint is needed.
238        let checkpoint = StageCheckpoint::new(input.target())
239            .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
240                progress: stage_checkpoint_progress(provider)?,
241                ..Default::default()
242            });
243
244        Ok(ExecOutput { checkpoint, done: true })
245    }
246
247    /// Unwind the stage.
248    fn unwind(
249        &mut self,
250        provider: &Provider,
251        input: UnwindInput,
252    ) -> Result<UnwindOutput, StageError> {
253        // NOTE: this runs in both v1 and v2 mode. In v2 mode, execution writes
254        // directly to `HashedAccounts`, but the unwind must still revert those
255        // entries here because `MerkleUnwind` runs after this stage (in unwind
256        // order) and needs `HashedAccounts` to reflect the target block state
257        // before it can verify the state root.
258        let (range, unwind_progress, _) =
259            input.unwind_block_range_with_threshold(self.commit_threshold);
260
261        provider.unwind_account_hashing_range(range)?;
262
263        let mut stage_checkpoint =
264            input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
265
266        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
267
268        Ok(UnwindOutput {
269            checkpoint: StageCheckpoint::new(unwind_progress)
270                .with_account_hashing_stage_checkpoint(stage_checkpoint),
271        })
272    }
273}
274
275/// Flushes channels hashes to ETL collector.
276fn collect(
277    channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
278    collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
279) -> Result<(), StageError> {
280    for channel in channels.iter_mut() {
281        while let Ok((key, v)) = channel.recv() {
282            collector.insert(key, v)?;
283        }
284    }
285    info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
286    channels.clear();
287    Ok(())
288}
289
290// TODO: Rewrite this
291/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
292/// in unit tests or benchmarks to generate an initial database state for running the
293/// stage.
294///
295/// In order to check the "full hashing" mode of the stage you want to generate more
296/// transitions than `AccountHashingStage.clean_threshold`. This requires:
297/// 1. Creating enough blocks so there's enough transactions to generate the required transition
298///    keys in the `BlockTransitionIndex` (which depends on the `TxTransitionIndex` internally)
299/// 2. Setting `blocks.len() > clean_threshold` so that there's enough diffs to actually take the
300///    2nd codepath
301#[derive(Clone, Debug)]
302pub struct SeedOpts {
303    /// The range of blocks to be generated
304    pub blocks: RangeInclusive<u64>,
305    /// The number of accounts to be generated
306    pub accounts: usize,
307    /// The range of transactions to be generated per block.
308    pub txs: Range<u8>,
309}
310
311fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
312    Ok(EntitiesCheckpoint {
313        processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
314        total: provider.count_entries::<tables::PlainAccountState>()? as u64,
315    })
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::test_utils::{
322        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
323        UnwindStageTestRunner,
324    };
325    use alloy_primitives::U256;
326    use assert_matches::assert_matches;
327    use reth_primitives_traits::Account;
328    use reth_provider::providers::StaticFileWriter;
329    use reth_stages_api::StageUnitCheckpoint;
330    use test_utils::*;
331
332    stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
333
334    #[tokio::test]
335    async fn execute_clean_account_hashing() {
336        let (previous_stage, stage_progress) = (20, 10);
337        // Set up the runner
338        let mut runner = AccountHashingTestRunner::default();
339        runner.set_clean_threshold(1);
340
341        let input = ExecInput {
342            target: Some(previous_stage),
343            checkpoint: Some(StageCheckpoint::new(stage_progress)),
344        };
345
346        runner.seed_execution(input).expect("failed to seed execution");
347
348        let rx = runner.execute(input);
349        let result = rx.await.unwrap();
350
351        assert_matches!(
352            result,
353            Ok(ExecOutput {
354                checkpoint: StageCheckpoint {
355                    block_number,
356                    stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
357                        progress: EntitiesCheckpoint {
358                            processed,
359                            total,
360                        },
361                        ..
362                    })),
363                },
364                done: true,
365            }) if block_number == previous_stage &&
366                processed == total &&
367                total == runner.db.count_entries::<tables::PlainAccountState>().unwrap() as u64
368        );
369
370        // Validate the stage execution
371        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
372    }
373
374    mod test_utils {
375        use super::*;
376        use crate::test_utils::TestStageDB;
377        use alloy_primitives::Address;
378        use reth_provider::DatabaseProviderFactory;
379
380        pub(crate) struct AccountHashingTestRunner {
381            pub(crate) db: TestStageDB,
382            commit_threshold: u64,
383            clean_threshold: u64,
384            etl_config: EtlConfig,
385        }
386
387        impl AccountHashingTestRunner {
388            pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
389                self.clean_threshold = threshold;
390            }
391
392            #[expect(dead_code)]
393            pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
394                self.commit_threshold = threshold;
395            }
396
397            /// Iterates over `PlainAccount` table and checks that the accounts match the ones
398            /// in the `HashedAccounts` table
399            pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
400                self.db.query(|tx| {
401                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
402                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
403
404                    while let Some((address, account)) = acc_cursor.next()? {
405                        let hashed_addr = keccak256(address);
406                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
407                            assert_eq!(acc, account)
408                        }
409                    }
410                    Ok(())
411                })?;
412
413                Ok(())
414            }
415
416            /// Same as `check_hashed_accounts`, only that checks with the old account state,
417            /// namely, the same account with nonce - 1 and balance - 1.
418            pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
419                self.db.query(|tx| {
420                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
421                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
422
423                    while let Some((address, account)) = acc_cursor.next()? {
424                        let Account { nonce, balance, .. } = account;
425                        let old_acc = Account {
426                            nonce: nonce - 1,
427                            balance: balance - U256::from(1),
428                            bytecode_hash: None,
429                        };
430                        let hashed_addr = keccak256(address);
431                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
432                            assert_eq!(acc, old_acc)
433                        }
434                    }
435                    Ok(())
436                })?;
437
438                Ok(())
439            }
440        }
441
442        impl Default for AccountHashingTestRunner {
443            fn default() -> Self {
444                Self {
445                    db: TestStageDB::default(),
446                    commit_threshold: 1000,
447                    clean_threshold: 1000,
448                    etl_config: EtlConfig::default(),
449                }
450            }
451        }
452
453        impl StageTestRunner for AccountHashingTestRunner {
454            type S = AccountHashingStage;
455
456            fn db(&self) -> &TestStageDB {
457                &self.db
458            }
459
460            fn stage(&self) -> Self::S {
461                Self::S {
462                    commit_threshold: self.commit_threshold,
463                    clean_threshold: self.clean_threshold,
464                    etl_config: self.etl_config.clone(),
465                }
466            }
467        }
468
469        impl ExecuteStageTestRunner for AccountHashingTestRunner {
470            type Seed = Vec<(Address, Account)>;
471
472            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
473                let provider = self.db.factory.database_provider_rw()?;
474                let res = Ok(AccountHashingStage::seed(
475                    &provider,
476                    SeedOpts { blocks: 0..=input.target(), accounts: 10, txs: 0..3 },
477                )
478                .unwrap());
479                provider.commit().expect("failed to commit");
480                res
481            }
482
483            fn validate_execution(
484                &self,
485                input: ExecInput,
486                output: Option<ExecOutput>,
487            ) -> Result<(), TestRunnerError> {
488                if let Some(output) = output {
489                    let start_block = input.next_block();
490                    let end_block = output.checkpoint.block_number;
491                    if start_block > end_block {
492                        return Ok(())
493                    }
494                }
495                self.check_hashed_accounts()
496            }
497        }
498
499        impl UnwindStageTestRunner for AccountHashingTestRunner {
500            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
501                self.check_old_hashed_accounts()
502            }
503        }
504    }
505}