reth_stages/stages/
hashing_storage.rs

1use alloy_primitives::{b256, bytes::BufMut, keccak256, Address, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbDupCursorRW},
6    models::{BlockNumberAddress, CompactU256},
7    table::Decompress,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16    StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20    fmt::Debug,
21    sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25/// Maximum number of channels that can exist in memory.
26const MAXIMUM_CHANNELS: usize = 10_000;
27
28/// Maximum number of storage entries to hash per rayon worker job.
29const WORKER_CHUNK_SIZE: usize = 100;
30
31/// Keccak256 hash of the zero address.
32const HASHED_ZERO_ADDRESS: B256 =
33    b256!("0x5380c7b7ae81a58eb98d9c78de4a1fd7fd9535fc953ed2be602daaa41767312a");
34
35/// Storage hashing stage hashes plain storage.
36/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
37#[derive(Debug)]
38pub struct StorageHashingStage {
39    /// The threshold (in number of blocks) for switching between incremental
40    /// hashing and full storage hashing.
41    pub clean_threshold: u64,
42    /// The maximum number of slots to process before committing during unwind.
43    pub commit_threshold: u64,
44    /// ETL configuration
45    pub etl_config: EtlConfig,
46}
47
48impl StorageHashingStage {
49    /// Create new instance of [`StorageHashingStage`].
50    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
51        Self {
52            clean_threshold: config.clean_threshold,
53            commit_threshold: config.commit_threshold,
54            etl_config,
55        }
56    }
57}
58
59impl Default for StorageHashingStage {
60    fn default() -> Self {
61        Self {
62            clean_threshold: 500_000,
63            commit_threshold: 100_000,
64            etl_config: EtlConfig::default(),
65        }
66    }
67}
68
69impl<Provider> Stage<Provider> for StorageHashingStage
70where
71    Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
72{
73    /// Return the id of the stage
74    fn id(&self) -> StageId {
75        StageId::StorageHashing
76    }
77
78    /// Execute the stage.
79    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
80        let tx = provider.tx_ref();
81        if input.target_reached() {
82            return Ok(ExecOutput::done(input.checkpoint()))
83        }
84
85        let (from_block, to_block) = input.next_block_range().into_inner();
86
87        // if there are more blocks then threshold it is faster to go over Plain state and hash all
88        // account otherwise take changesets aggregate the sets and apply hashing to
89        // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
90        // genesis accounts are not in changeset, along with their storages.
91        if to_block - from_block > self.clean_threshold || from_block == 1 {
92            // clear table, load all accounts and hash it
93            tx.clear::<tables::HashedStorages>()?;
94
95            let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
96            let mut collector =
97                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
98            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
99
100            for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
101                // An _unordered_ channel to receive results from a rayon job
102                let (tx, rx) = mpsc::channel();
103                channels.push(rx);
104
105                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
106                // Spawn the hashing task onto the global rayon pool
107                rayon::spawn(move || {
108                    // Cache hashed address since PlainStorageState is sorted by address
109                    let (mut last_addr, mut hashed_addr) = (Address::ZERO, HASHED_ZERO_ADDRESS);
110                    for (address, slot) in chunk {
111                        if address != last_addr {
112                            last_addr = address;
113                            hashed_addr = keccak256(address);
114                        }
115                        let mut addr_key = Vec::with_capacity(64);
116                        addr_key.put_slice(hashed_addr.as_slice());
117                        addr_key.put_slice(keccak256(slot.key).as_slice());
118                        let _ = tx.send((addr_key, CompactU256::from(slot.value)));
119                    }
120                });
121
122                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
123                if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
124                    collect(&mut channels, &mut collector)?;
125                }
126            }
127
128            collect(&mut channels, &mut collector)?;
129
130            let total_hashes = collector.len();
131            let interval = (total_hashes / 10).max(1);
132            let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
133            for (index, item) in collector.iter()?.enumerate() {
134                if index > 0 && index.is_multiple_of(interval) {
135                    info!(
136                        target: "sync::stages::hashing_storage",
137                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
138                        "Inserting hashes"
139                    );
140                }
141
142                let (addr_key, value) = item?;
143                cursor.append_dup(
144                    B256::from_slice(&addr_key[..32]),
145                    StorageEntry {
146                        key: B256::from_slice(&addr_key[32..]),
147                        value: CompactU256::decompress_owned(value)?.into(),
148                    },
149                )?;
150            }
151        } else {
152            // Aggregate all changesets and make list of storages that have been
153            // changed.
154            let lists = provider.changed_storages_with_range(from_block..=to_block)?;
155            // iterate over plain state and get newest storage value.
156            // Assumption we are okay with is that plain state represent
157            // `previous_stage_progress` state.
158            let storages = provider.plain_state_storages(lists)?;
159            provider.insert_storage_for_hashing(storages)?;
160        }
161
162        // We finished the hashing stage, no future iterations is expected for the same block range,
163        // so no checkpoint is needed.
164        let checkpoint = StageCheckpoint::new(input.target())
165            .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
166                progress: stage_checkpoint_progress(provider)?,
167                ..Default::default()
168            });
169
170        Ok(ExecOutput { checkpoint, done: true })
171    }
172
173    /// Unwind the stage.
174    fn unwind(
175        &mut self,
176        provider: &Provider,
177        input: UnwindInput,
178    ) -> Result<UnwindOutput, StageError> {
179        let (range, unwind_progress, _) =
180            input.unwind_block_range_with_threshold(self.commit_threshold);
181
182        provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;
183
184        let mut stage_checkpoint =
185            input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
186
187        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
188
189        Ok(UnwindOutput {
190            checkpoint: StageCheckpoint::new(unwind_progress)
191                .with_storage_hashing_stage_checkpoint(stage_checkpoint),
192        })
193    }
194}
195
196/// Flushes channels hashes to ETL collector.
197fn collect(
198    channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
199    collector: &mut Collector<Vec<u8>, CompactU256>,
200) -> Result<(), StageError> {
201    for channel in channels.iter_mut() {
202        while let Ok((key, v)) = channel.recv() {
203            collector.insert(key, v)?;
204        }
205    }
206    info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
207    channels.clear();
208    Ok(())
209}
210
211fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
212    Ok(EntitiesCheckpoint {
213        processed: provider.count_entries::<tables::HashedStorages>()? as u64,
214        total: provider.count_entries::<tables::PlainStorageState>()? as u64,
215    })
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::test_utils::{
222        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
223        TestStageDB, UnwindStageTestRunner,
224    };
225    use alloy_primitives::{Address, U256};
226    use assert_matches::assert_matches;
227    use rand::Rng;
228    use reth_db_api::{
229        cursor::{DbCursorRW, DbDupCursorRO},
230        models::StoredBlockBodyIndices,
231    };
232    use reth_ethereum_primitives::Block;
233    use reth_primitives_traits::SealedBlock;
234    use reth_provider::providers::StaticFileWriter;
235    use reth_testing_utils::generators::{
236        self, random_block_range, random_contract_account_range, BlockRangeParams,
237    };
238
239    stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
240
241    /// Execute with low clean threshold so as to hash whole storage
242    #[tokio::test]
243    async fn execute_clean_storage_hashing() {
244        let (previous_stage, stage_progress) = (500, 100);
245
246        // Set up the runner
247        let mut runner = StorageHashingTestRunner::default();
248
249        // set low clean threshold so we hash the whole storage
250        runner.set_clean_threshold(1);
251
252        // set low commit threshold so we force each entry to be a tx.commit and make sure we don't
253        // hang on one key. Seed execution inserts more than one storage entry per address.
254        runner.set_commit_threshold(1);
255
256        let mut input = ExecInput {
257            target: Some(previous_stage),
258            checkpoint: Some(StageCheckpoint::new(stage_progress)),
259        };
260
261        runner.seed_execution(input).expect("failed to seed execution");
262
263        loop {
264            if let Ok(result @ ExecOutput { checkpoint, done }) =
265                runner.execute(input).await.unwrap()
266            {
267                if !done {
268                    let previous_checkpoint = input
269                        .checkpoint
270                        .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
271                        .unwrap_or_default();
272                    assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
273                        progress: EntitiesCheckpoint {
274                            processed,
275                            total,
276                        },
277                        ..
278                    }) if processed == previous_checkpoint.progress.processed + 1 &&
279                        total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
280
281                    // Continue from checkpoint
282                    input.checkpoint = Some(checkpoint);
283                    continue
284                }
285                assert_eq!(checkpoint.block_number, previous_stage);
286                assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
287                        progress: EntitiesCheckpoint {
288                            processed,
289                            total,
290                        },
291                        ..
292                    }) if processed == total &&
293                        total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
294
295                // Validate the stage execution
296                assert!(
297                    runner.validate_execution(input, Some(result)).is_ok(),
298                    "execution validation"
299                );
300
301                break
302            }
303            panic!("Failed execution");
304        }
305    }
306
307    struct StorageHashingTestRunner {
308        db: TestStageDB,
309        commit_threshold: u64,
310        clean_threshold: u64,
311        etl_config: EtlConfig,
312    }
313
314    impl Default for StorageHashingTestRunner {
315        fn default() -> Self {
316            Self {
317                db: TestStageDB::default(),
318                commit_threshold: 1000,
319                clean_threshold: 1000,
320                etl_config: EtlConfig::default(),
321            }
322        }
323    }
324
325    impl StageTestRunner for StorageHashingTestRunner {
326        type S = StorageHashingStage;
327
328        fn db(&self) -> &TestStageDB {
329            &self.db
330        }
331
332        fn stage(&self) -> Self::S {
333            Self::S {
334                commit_threshold: self.commit_threshold,
335                clean_threshold: self.clean_threshold,
336                etl_config: self.etl_config.clone(),
337            }
338        }
339    }
340
341    impl ExecuteStageTestRunner for StorageHashingTestRunner {
342        type Seed = Vec<SealedBlock<Block>>;
343
344        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
345            let stage_progress = input.next_block();
346            let end = input.target();
347            let mut rng = generators::rng();
348
349            let n_accounts = 31;
350            let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
351
352            let blocks = random_block_range(
353                &mut rng,
354                stage_progress..=end,
355                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
356            );
357
358            self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
359
360            let iter = blocks.iter();
361            let mut next_tx_num = 0;
362            let mut first_tx_num = next_tx_num;
363            for progress in iter {
364                // Insert last progress data
365                let block_number = progress.number;
366                self.db.commit(|tx| {
367                    progress.body().transactions.iter().try_for_each(
368                        |transaction| -> Result<(), reth_db::DatabaseError> {
369                            tx.put::<tables::TransactionHashNumbers>(
370                                *transaction.tx_hash(),
371                                next_tx_num,
372                            )?;
373                            tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
374
375                            let (addr, _) = accounts
376                                .get_mut((rng.random::<u64>() % n_accounts) as usize)
377                                .unwrap();
378
379                            for _ in 0..2 {
380                                let new_entry = StorageEntry {
381                                    key: keccak256([rng.random::<u8>()]),
382                                    value: U256::from(rng.random::<u8>() % 30 + 1),
383                                };
384                                self.insert_storage_entry(
385                                    tx,
386                                    (block_number, *addr).into(),
387                                    new_entry,
388                                    progress.number == stage_progress,
389                                )?;
390                            }
391
392                            next_tx_num += 1;
393                            Ok(())
394                        },
395                    )?;
396
397                    // Randomize rewards
398                    let has_reward: bool = rng.random();
399                    if has_reward {
400                        self.insert_storage_entry(
401                            tx,
402                            (block_number, Address::random()).into(),
403                            StorageEntry {
404                                key: keccak256("mining"),
405                                value: U256::from(rng.random::<u32>()),
406                            },
407                            progress.number == stage_progress,
408                        )?;
409                    }
410
411                    let body = StoredBlockBodyIndices {
412                        first_tx_num,
413                        tx_count: progress.transaction_count() as u64,
414                    };
415
416                    first_tx_num = next_tx_num;
417
418                    tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
419                    Ok(())
420                })?;
421            }
422
423            Ok(blocks)
424        }
425
426        fn validate_execution(
427            &self,
428            input: ExecInput,
429            output: Option<ExecOutput>,
430        ) -> Result<(), TestRunnerError> {
431            if let Some(output) = output {
432                let start_block = input.checkpoint().block_number + 1;
433                let end_block = output.checkpoint.block_number;
434                if start_block > end_block {
435                    return Ok(())
436                }
437            }
438            self.check_hashed_storage()
439        }
440    }
441
442    impl UnwindStageTestRunner for StorageHashingTestRunner {
443        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
444            self.unwind_storage(input)?;
445            self.check_hashed_storage()
446        }
447    }
448
449    impl StorageHashingTestRunner {
450        fn set_clean_threshold(&mut self, threshold: u64) {
451            self.clean_threshold = threshold;
452        }
453
454        fn set_commit_threshold(&mut self, threshold: u64) {
455            self.commit_threshold = threshold;
456        }
457
458        fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
459            self.db
460                .query(|tx| {
461                    let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
462                    let mut hashed_storage_cursor =
463                        tx.cursor_dup_read::<tables::HashedStorages>()?;
464
465                    let mut expected = 0;
466
467                    while let Some((address, entry)) = storage_cursor.next()? {
468                        let key = keccak256(entry.key);
469                        let got =
470                            hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
471                        assert_eq!(
472                            got,
473                            Some(StorageEntry { key, ..entry }),
474                            "{expected}: {address:?}"
475                        );
476                        expected += 1;
477                    }
478                    let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
479
480                    assert_eq!(count, expected);
481                    Ok(())
482                })
483                .map_err(|e| e.into())
484        }
485
486        fn insert_storage_entry<TX: DbTxMut>(
487            &self,
488            tx: &TX,
489            bn_address: BlockNumberAddress,
490            entry: StorageEntry,
491            hash: bool,
492        ) -> Result<(), reth_db::DatabaseError> {
493            let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
494            let prev_entry =
495                match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
496                    Some(e) if e.key == entry.key => {
497                        tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
498                            .expect("failed to delete entry");
499                        e
500                    }
501                    _ => StorageEntry { key: entry.key, value: U256::from(0) },
502                };
503            tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
504
505            if hash {
506                let hashed_address = keccak256(bn_address.address());
507                let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
508
509                if let Some(e) = tx
510                    .cursor_dup_write::<tables::HashedStorages>()?
511                    .seek_by_key_subkey(hashed_address, hashed_entry.key)?
512                    .filter(|e| e.key == hashed_entry.key)
513                {
514                    tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
515                        .expect("failed to delete entry");
516                }
517
518                tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
519            }
520
521            tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
522            Ok(())
523        }
524
525        fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
526            tracing::debug!("unwinding storage...");
527            let target_block = input.unwind_to;
528            self.db.commit(|tx| {
529                let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
530                let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
531
532                let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
533
534                while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
535                    if bn_address.block_number() < target_block {
536                        break
537                    }
538
539                    if storage_cursor
540                        .seek_by_key_subkey(bn_address.address(), entry.key)?
541                        .filter(|e| e.key == entry.key)
542                        .is_some()
543                    {
544                        storage_cursor.delete_current()?;
545                    }
546
547                    if !entry.value.is_zero() {
548                        storage_cursor.upsert(bn_address.address(), &entry)?;
549                    }
550                }
551                Ok(())
552            })?;
553            Ok(())
554        }
555    }
556}