reth_stages/stages/
hashing_account.rs

1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    tables,
7    transaction::{DbTx, DbTxMut},
8    RawKey, RawTable, RawValue,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::Account;
12use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader};
13use reth_stages_api::{
14    AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
15    StageError, StageId, UnwindInput, UnwindOutput,
16};
17use reth_storage_errors::provider::ProviderResult;
18use std::{
19    fmt::Debug,
20    ops::{Range, RangeInclusive},
21    sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25/// Maximum number of channels that can exist in memory.
26const MAXIMUM_CHANNELS: usize = 10_000;
27
28/// Maximum number of accounts to hash per rayon worker job.
29const WORKER_CHUNK_SIZE: usize = 100;
30
31/// Account hashing stage hashes plain account.
32/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
33#[derive(Clone, Debug)]
34pub struct AccountHashingStage {
35    /// The threshold (in number of blocks) for switching between incremental
36    /// hashing and full storage hashing.
37    pub clean_threshold: u64,
38    /// The maximum number of accounts to process before committing during unwind.
39    pub commit_threshold: u64,
40    /// ETL configuration
41    pub etl_config: EtlConfig,
42}
43
44impl AccountHashingStage {
45    /// Create new instance of [`AccountHashingStage`].
46    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
47        Self {
48            clean_threshold: config.clean_threshold,
49            commit_threshold: config.commit_threshold,
50            etl_config,
51        }
52    }
53}
54
55#[cfg(any(test, feature = "test-utils"))]
56impl AccountHashingStage {
57    /// Initializes the `PlainAccountState` table with `num_accounts` having some random state
58    /// at the target block, with `txs_range` transactions in each block.
59    ///
60    /// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the
61    /// account state in the `AccountChangeSets` table.
62    pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
63        provider: &reth_provider::DatabaseProvider<Tx, N>,
64        opts: SeedOpts,
65    ) -> Result<Vec<(alloy_primitives::Address, Account)>, StageError>
66    where
67        N::Primitives: reth_primitives_traits::FullNodePrimitives<
68            Block = reth_ethereum_primitives::Block,
69            BlockHeader = reth_primitives_traits::Header,
70        >,
71    {
72        use alloy_primitives::U256;
73        use reth_db_api::models::AccountBeforeTx;
74        use reth_provider::{StaticFileProviderFactory, StaticFileWriter};
75        use reth_testing_utils::{
76            generators,
77            generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
78        };
79
80        let mut rng = generators::rng();
81
82        let blocks = random_block_range(
83            &mut rng,
84            opts.blocks.clone(),
85            BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
86        );
87
88        for block in blocks {
89            provider.insert_historical_block(block.try_recover().unwrap()).unwrap();
90        }
91        provider
92            .static_file_provider()
93            .latest_writer(reth_static_file_types::StaticFileSegment::Headers)
94            .unwrap()
95            .commit()
96            .unwrap();
97        let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
98        {
99            // Account State generator
100            let mut account_cursor =
101                provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
102            accounts.sort_by(|a, b| a.0.cmp(&b.0));
103            for (addr, acc) in &accounts {
104                account_cursor.append(*addr, acc)?;
105            }
106
107            let mut acc_changeset_cursor =
108                provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
109            for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
110                let Account { nonce, balance, .. } = acc;
111                let prev_acc = Account {
112                    nonce: nonce - 1,
113                    balance: balance - U256::from(1),
114                    bytecode_hash: None,
115                };
116                let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
117                acc_changeset_cursor.append(t, &acc_before_tx)?;
118            }
119        }
120
121        Ok(accounts)
122    }
123}
124
125impl Default for AccountHashingStage {
126    fn default() -> Self {
127        Self {
128            clean_threshold: 500_000,
129            commit_threshold: 100_000,
130            etl_config: EtlConfig::default(),
131        }
132    }
133}
134
135impl<Provider> Stage<Provider> for AccountHashingStage
136where
137    Provider: DBProvider<Tx: DbTxMut> + HashingWriter + AccountExtReader + StatsReader,
138{
139    /// Return the id of the stage
140    fn id(&self) -> StageId {
141        StageId::AccountHashing
142    }
143
144    /// Execute the stage.
145    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
146        if input.target_reached() {
147            return Ok(ExecOutput::done(input.checkpoint()))
148        }
149
150        let (from_block, to_block) = input.next_block_range().into_inner();
151
152        // if there are more blocks then threshold it is faster to go over Plain state and hash all
153        // account otherwise take changesets aggregate the sets and apply hashing to
154        // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
155        // genesis accounts are not in changeset.
156        if to_block - from_block > self.clean_threshold || from_block == 1 {
157            let tx = provider.tx_ref();
158
159            // clear table, load all accounts and hash it
160            tx.clear::<tables::HashedAccounts>()?;
161
162            let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
163            let mut collector =
164                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
165            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
166
167            // channels used to return result of account hashing
168            for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
169                // An _unordered_ channel to receive results from a rayon job
170                let (tx, rx) = mpsc::channel();
171                channels.push(rx);
172
173                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
174                // Spawn the hashing task onto the global rayon pool
175                rayon::spawn(move || {
176                    for (address, account) in chunk {
177                        let address = address.key().unwrap();
178                        let _ = tx.send((RawKey::new(keccak256(address)), account));
179                    }
180                });
181
182                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
183                if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
184                    collect(&mut channels, &mut collector)?;
185                }
186            }
187
188            collect(&mut channels, &mut collector)?;
189
190            let mut hashed_account_cursor =
191                tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
192
193            let total_hashes = collector.len();
194            let interval = (total_hashes / 10).max(1);
195            for (index, item) in collector.iter()?.enumerate() {
196                if index > 0 && index % interval == 0 {
197                    info!(
198                        target: "sync::stages::hashing_account",
199                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
200                        "Inserting hashes"
201                    );
202                }
203
204                let (key, value) = item?;
205                hashed_account_cursor
206                    .append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
207            }
208        } else {
209            // Aggregate all transition changesets and make a list of accounts that have been
210            // changed.
211            let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
212            // Iterate over plain state and get newest value.
213            // Assumption we are okay to make is that plainstate represent
214            // `previous_stage_progress` state.
215            let accounts = provider.basic_accounts(lists)?;
216            // Insert and hash accounts to hashing table
217            provider.insert_account_for_hashing(accounts)?;
218        }
219
220        // We finished the hashing stage, no future iterations is expected for the same block range,
221        // so no checkpoint is needed.
222        let checkpoint = StageCheckpoint::new(input.target())
223            .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
224                progress: stage_checkpoint_progress(provider)?,
225                ..Default::default()
226            });
227
228        Ok(ExecOutput { checkpoint, done: true })
229    }
230
231    /// Unwind the stage.
232    fn unwind(
233        &mut self,
234        provider: &Provider,
235        input: UnwindInput,
236    ) -> Result<UnwindOutput, StageError> {
237        let (range, unwind_progress, _) =
238            input.unwind_block_range_with_threshold(self.commit_threshold);
239
240        // Aggregate all transition changesets and make a list of accounts that have been changed.
241        provider.unwind_account_hashing_range(range)?;
242
243        let mut stage_checkpoint =
244            input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
245
246        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
247
248        Ok(UnwindOutput {
249            checkpoint: StageCheckpoint::new(unwind_progress)
250                .with_account_hashing_stage_checkpoint(stage_checkpoint),
251        })
252    }
253}
254
255/// Flushes channels hashes to ETL collector.
256fn collect(
257    channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
258    collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
259) -> Result<(), StageError> {
260    for channel in channels.iter_mut() {
261        while let Ok((key, v)) = channel.recv() {
262            collector.insert(key, v)?;
263        }
264    }
265    info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
266    channels.clear();
267    Ok(())
268}
269
270// TODO: Rewrite this
271/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
272/// in unit tests or benchmarks to generate an initial database state for running the
273/// stage.
274///
275/// In order to check the "full hashing" mode of the stage you want to generate more
276/// transitions than `AccountHashingStage.clean_threshold`. This requires:
277/// 1. Creating enough blocks so there's enough transactions to generate the required transition
278///    keys in the `BlockTransitionIndex` (which depends on the `TxTransitionIndex` internally)
279/// 2. Setting `blocks.len() > clean_threshold` so that there's enough diffs to actually take the
280///    2nd codepath
281#[derive(Clone, Debug)]
282pub struct SeedOpts {
283    /// The range of blocks to be generated
284    pub blocks: RangeInclusive<u64>,
285    /// The number of accounts to be generated
286    pub accounts: usize,
287    /// The range of transactions to be generated per block.
288    pub txs: Range<u8>,
289}
290
291fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
292    Ok(EntitiesCheckpoint {
293        processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
294        total: provider.count_entries::<tables::PlainAccountState>()? as u64,
295    })
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::test_utils::{
302        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
303        UnwindStageTestRunner,
304    };
305    use alloy_primitives::U256;
306    use assert_matches::assert_matches;
307    use reth_primitives_traits::Account;
308    use reth_provider::providers::StaticFileWriter;
309    use reth_stages_api::StageUnitCheckpoint;
310    use test_utils::*;
311
312    stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
313
314    #[tokio::test]
315    async fn execute_clean_account_hashing() {
316        let (previous_stage, stage_progress) = (20, 10);
317        // Set up the runner
318        let mut runner = AccountHashingTestRunner::default();
319        runner.set_clean_threshold(1);
320
321        let input = ExecInput {
322            target: Some(previous_stage),
323            checkpoint: Some(StageCheckpoint::new(stage_progress)),
324        };
325
326        runner.seed_execution(input).expect("failed to seed execution");
327
328        let rx = runner.execute(input);
329        let result = rx.await.unwrap();
330
331        assert_matches!(
332            result,
333            Ok(ExecOutput {
334                checkpoint: StageCheckpoint {
335                    block_number,
336                    stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
337                        progress: EntitiesCheckpoint {
338                            processed,
339                            total,
340                        },
341                        ..
342                    })),
343                },
344                done: true,
345            }) if block_number == previous_stage &&
346                processed == total &&
347                total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
348        );
349
350        // Validate the stage execution
351        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
352    }
353
354    mod test_utils {
355        use super::*;
356        use crate::test_utils::TestStageDB;
357        use alloy_primitives::Address;
358        use reth_provider::DatabaseProviderFactory;
359
360        pub(crate) struct AccountHashingTestRunner {
361            pub(crate) db: TestStageDB,
362            commit_threshold: u64,
363            clean_threshold: u64,
364            etl_config: EtlConfig,
365        }
366
367        impl AccountHashingTestRunner {
368            pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
369                self.clean_threshold = threshold;
370            }
371
372            #[allow(dead_code)]
373            pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
374                self.commit_threshold = threshold;
375            }
376
377            /// Iterates over `PlainAccount` table and checks that the accounts match the ones
378            /// in the `HashedAccounts` table
379            pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
380                self.db.query(|tx| {
381                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
382                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
383
384                    while let Some((address, account)) = acc_cursor.next()? {
385                        let hashed_addr = keccak256(address);
386                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
387                            assert_eq!(acc, account)
388                        }
389                    }
390                    Ok(())
391                })?;
392
393                Ok(())
394            }
395
396            /// Same as `check_hashed_accounts`, only that checks with the old account state,
397            /// namely, the same account with nonce - 1 and balance - 1.
398            pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
399                self.db.query(|tx| {
400                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
401                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
402
403                    while let Some((address, account)) = acc_cursor.next()? {
404                        let Account { nonce, balance, .. } = account;
405                        let old_acc = Account {
406                            nonce: nonce - 1,
407                            balance: balance - U256::from(1),
408                            bytecode_hash: None,
409                        };
410                        let hashed_addr = keccak256(address);
411                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
412                            assert_eq!(acc, old_acc)
413                        }
414                    }
415                    Ok(())
416                })?;
417
418                Ok(())
419            }
420        }
421
422        impl Default for AccountHashingTestRunner {
423            fn default() -> Self {
424                Self {
425                    db: TestStageDB::default(),
426                    commit_threshold: 1000,
427                    clean_threshold: 1000,
428                    etl_config: EtlConfig::default(),
429                }
430            }
431        }
432
433        impl StageTestRunner for AccountHashingTestRunner {
434            type S = AccountHashingStage;
435
436            fn db(&self) -> &TestStageDB {
437                &self.db
438            }
439
440            fn stage(&self) -> Self::S {
441                Self::S {
442                    commit_threshold: self.commit_threshold,
443                    clean_threshold: self.clean_threshold,
444                    etl_config: self.etl_config.clone(),
445                }
446            }
447        }
448
449        impl ExecuteStageTestRunner for AccountHashingTestRunner {
450            type Seed = Vec<(Address, Account)>;
451
452            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
453                let provider = self.db.factory.database_provider_rw()?;
454                let res = Ok(AccountHashingStage::seed(
455                    &provider,
456                    SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 },
457                )
458                .unwrap());
459                provider.commit().expect("failed to commit");
460                res
461            }
462
463            fn validate_execution(
464                &self,
465                input: ExecInput,
466                output: Option<ExecOutput>,
467            ) -> Result<(), TestRunnerError> {
468                if let Some(output) = output {
469                    let start_block = input.next_block();
470                    let end_block = output.checkpoint.block_number;
471                    if start_block > end_block {
472                        return Ok(())
473                    }
474                }
475                self.check_hashed_accounts()
476            }
477        }
478
479        impl UnwindStageTestRunner for AccountHashingTestRunner {
480            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
481                self.check_old_hashed_accounts()
482            }
483        }
484    }
485}