reth_stages/stages/
hashing_storage.rs

1use alloy_primitives::{bytes::BufMut, keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbDupCursorRW},
6    models::{BlockNumberAddress, CompactU256},
7    table::Decompress,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16    StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20    fmt::Debug,
21    sync::mpsc::{self, Receiver},
22};
23use tracing::*;
24
25/// Maximum number of channels that can exist in memory.
26const MAXIMUM_CHANNELS: usize = 10_000;
27
28/// Maximum number of storage entries to hash per rayon worker job.
29const WORKER_CHUNK_SIZE: usize = 100;
30
31/// Storage hashing stage hashes plain storage.
32/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
33#[derive(Debug)]
34pub struct StorageHashingStage {
35    /// The threshold (in number of blocks) for switching between incremental
36    /// hashing and full storage hashing.
37    pub clean_threshold: u64,
38    /// The maximum number of slots to process before committing during unwind.
39    pub commit_threshold: u64,
40    /// ETL configuration
41    pub etl_config: EtlConfig,
42}
43
44impl StorageHashingStage {
45    /// Create new instance of [`StorageHashingStage`].
46    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
47        Self {
48            clean_threshold: config.clean_threshold,
49            commit_threshold: config.commit_threshold,
50            etl_config,
51        }
52    }
53}
54
55impl Default for StorageHashingStage {
56    fn default() -> Self {
57        Self {
58            clean_threshold: 500_000,
59            commit_threshold: 100_000,
60            etl_config: EtlConfig::default(),
61        }
62    }
63}
64
65impl<Provider> Stage<Provider> for StorageHashingStage
66where
67    Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
68{
69    /// Return the id of the stage
70    fn id(&self) -> StageId {
71        StageId::StorageHashing
72    }
73
74    /// Execute the stage.
75    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
76        let tx = provider.tx_ref();
77        if input.target_reached() {
78            return Ok(ExecOutput::done(input.checkpoint()))
79        }
80
81        let (from_block, to_block) = input.next_block_range().into_inner();
82
83        // if there are more blocks then threshold it is faster to go over Plain state and hash all
84        // account otherwise take changesets aggregate the sets and apply hashing to
85        // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
86        // genesis accounts are not in changeset, along with their storages.
87        if to_block - from_block > self.clean_threshold || from_block == 1 {
88            // clear table, load all accounts and hash it
89            tx.clear::<tables::HashedStorages>()?;
90
91            let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
92            let mut collector =
93                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
94            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
95
96            for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
97                // An _unordered_ channel to receive results from a rayon job
98                let (tx, rx) = mpsc::channel();
99                channels.push(rx);
100
101                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
102                // Spawn the hashing task onto the global rayon pool
103                rayon::spawn(move || {
104                    for (address, slot) in chunk {
105                        let mut addr_key = Vec::with_capacity(64);
106                        addr_key.put_slice(keccak256(address).as_slice());
107                        addr_key.put_slice(keccak256(slot.key).as_slice());
108                        let _ = tx.send((addr_key, CompactU256::from(slot.value)));
109                    }
110                });
111
112                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
113                if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
114                    collect(&mut channels, &mut collector)?;
115                }
116            }
117
118            collect(&mut channels, &mut collector)?;
119
120            let total_hashes = collector.len();
121            let interval = (total_hashes / 10).max(1);
122            let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
123            for (index, item) in collector.iter()?.enumerate() {
124                if index > 0 && index % interval == 0 {
125                    info!(
126                        target: "sync::stages::hashing_storage",
127                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
128                        "Inserting hashes"
129                    );
130                }
131
132                let (addr_key, value) = item?;
133                cursor.append_dup(
134                    B256::from_slice(&addr_key[..32]),
135                    StorageEntry {
136                        key: B256::from_slice(&addr_key[32..]),
137                        value: CompactU256::decompress_owned(value)?.into(),
138                    },
139                )?;
140            }
141        } else {
142            // Aggregate all changesets and make list of storages that have been
143            // changed.
144            let lists = provider.changed_storages_with_range(from_block..=to_block)?;
145            // iterate over plain state and get newest storage value.
146            // Assumption we are okay with is that plain state represent
147            // `previous_stage_progress` state.
148            let storages = provider.plain_state_storages(lists)?;
149            provider.insert_storage_for_hashing(storages)?;
150        }
151
152        // We finished the hashing stage, no future iterations is expected for the same block range,
153        // so no checkpoint is needed.
154        let checkpoint = StageCheckpoint::new(input.target())
155            .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
156                progress: stage_checkpoint_progress(provider)?,
157                ..Default::default()
158            });
159
160        Ok(ExecOutput { checkpoint, done: true })
161    }
162
163    /// Unwind the stage.
164    fn unwind(
165        &mut self,
166        provider: &Provider,
167        input: UnwindInput,
168    ) -> Result<UnwindOutput, StageError> {
169        let (range, unwind_progress, _) =
170            input.unwind_block_range_with_threshold(self.commit_threshold);
171
172        provider.unwind_storage_hashing_range(BlockNumberAddress::range(range))?;
173
174        let mut stage_checkpoint =
175            input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
176
177        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
178
179        Ok(UnwindOutput {
180            checkpoint: StageCheckpoint::new(unwind_progress)
181                .with_storage_hashing_stage_checkpoint(stage_checkpoint),
182        })
183    }
184}
185
186/// Flushes channels hashes to ETL collector.
187fn collect(
188    channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
189    collector: &mut Collector<Vec<u8>, CompactU256>,
190) -> Result<(), StageError> {
191    for channel in channels.iter_mut() {
192        while let Ok((key, v)) = channel.recv() {
193            collector.insert(key, v)?;
194        }
195    }
196    info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
197    channels.clear();
198    Ok(())
199}
200
201fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
202    Ok(EntitiesCheckpoint {
203        processed: provider.count_entries::<tables::HashedStorages>()? as u64,
204        total: provider.count_entries::<tables::PlainStorageState>()? as u64,
205    })
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::test_utils::{
212        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
213        TestStageDB, UnwindStageTestRunner,
214    };
215    use alloy_primitives::{Address, U256};
216    use assert_matches::assert_matches;
217    use rand::Rng;
218    use reth_db_api::{
219        cursor::{DbCursorRW, DbDupCursorRO},
220        models::StoredBlockBodyIndices,
221    };
222    use reth_ethereum_primitives::Block;
223    use reth_primitives_traits::{SealedBlock, SignedTransaction};
224    use reth_provider::providers::StaticFileWriter;
225    use reth_testing_utils::generators::{
226        self, random_block_range, random_contract_account_range, BlockRangeParams,
227    };
228
229    stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
230
231    /// Execute with low clean threshold so as to hash whole storage
232    #[tokio::test]
233    async fn execute_clean_storage_hashing() {
234        let (previous_stage, stage_progress) = (500, 100);
235
236        // Set up the runner
237        let mut runner = StorageHashingTestRunner::default();
238
239        // set low clean threshold so we hash the whole storage
240        runner.set_clean_threshold(1);
241
242        // set low commit threshold so we force each entry to be a tx.commit and make sure we don't
243        // hang on one key. Seed execution inserts more than one storage entry per address.
244        runner.set_commit_threshold(1);
245
246        let mut input = ExecInput {
247            target: Some(previous_stage),
248            checkpoint: Some(StageCheckpoint::new(stage_progress)),
249        };
250
251        runner.seed_execution(input).expect("failed to seed execution");
252
253        loop {
254            if let Ok(result @ ExecOutput { checkpoint, done }) =
255                runner.execute(input).await.unwrap()
256            {
257                if !done {
258                    let previous_checkpoint = input
259                        .checkpoint
260                        .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
261                        .unwrap_or_default();
262                    assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
263                        progress: EntitiesCheckpoint {
264                            processed,
265                            total,
266                        },
267                        ..
268                    }) if processed == previous_checkpoint.progress.processed + 1 &&
269                        total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
270
271                    // Continue from checkpoint
272                    input.checkpoint = Some(checkpoint);
273                    continue
274                }
275                assert_eq!(checkpoint.block_number, previous_stage);
276                assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
277                        progress: EntitiesCheckpoint {
278                            processed,
279                            total,
280                        },
281                        ..
282                    }) if processed == total &&
283                        total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
284
285                // Validate the stage execution
286                assert!(
287                    runner.validate_execution(input, Some(result)).is_ok(),
288                    "execution validation"
289                );
290
291                break
292            }
293            panic!("Failed execution");
294        }
295    }
296
297    struct StorageHashingTestRunner {
298        db: TestStageDB,
299        commit_threshold: u64,
300        clean_threshold: u64,
301        etl_config: EtlConfig,
302    }
303
304    impl Default for StorageHashingTestRunner {
305        fn default() -> Self {
306            Self {
307                db: TestStageDB::default(),
308                commit_threshold: 1000,
309                clean_threshold: 1000,
310                etl_config: EtlConfig::default(),
311            }
312        }
313    }
314
315    impl StageTestRunner for StorageHashingTestRunner {
316        type S = StorageHashingStage;
317
318        fn db(&self) -> &TestStageDB {
319            &self.db
320        }
321
322        fn stage(&self) -> Self::S {
323            Self::S {
324                commit_threshold: self.commit_threshold,
325                clean_threshold: self.clean_threshold,
326                etl_config: self.etl_config.clone(),
327            }
328        }
329    }
330
331    impl ExecuteStageTestRunner for StorageHashingTestRunner {
332        type Seed = Vec<SealedBlock<Block>>;
333
334        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
335            let stage_progress = input.next_block();
336            let end = input.target();
337            let mut rng = generators::rng();
338
339            let n_accounts = 31;
340            let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
341
342            let blocks = random_block_range(
343                &mut rng,
344                stage_progress..=end,
345                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
346            );
347
348            self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
349
350            let iter = blocks.iter();
351            let mut next_tx_num = 0;
352            let mut first_tx_num = next_tx_num;
353            for progress in iter {
354                // Insert last progress data
355                let block_number = progress.number;
356                self.db.commit(|tx| {
357                    progress.body().transactions.iter().try_for_each(
358                        |transaction| -> Result<(), reth_db::DatabaseError> {
359                            tx.put::<tables::TransactionHashNumbers>(
360                                *transaction.tx_hash(),
361                                next_tx_num,
362                            )?;
363                            tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
364
365                            let (addr, _) =
366                                accounts.get_mut(rng.gen::<usize>() % n_accounts as usize).unwrap();
367
368                            for _ in 0..2 {
369                                let new_entry = StorageEntry {
370                                    key: keccak256([rng.gen::<u8>()]),
371                                    value: U256::from(rng.gen::<u8>() % 30 + 1),
372                                };
373                                self.insert_storage_entry(
374                                    tx,
375                                    (block_number, *addr).into(),
376                                    new_entry,
377                                    progress.number == stage_progress,
378                                )?;
379                            }
380
381                            next_tx_num += 1;
382                            Ok(())
383                        },
384                    )?;
385
386                    // Randomize rewards
387                    let has_reward: bool = rng.gen();
388                    if has_reward {
389                        self.insert_storage_entry(
390                            tx,
391                            (block_number, Address::random()).into(),
392                            StorageEntry {
393                                key: keccak256("mining"),
394                                value: U256::from(rng.gen::<u32>()),
395                            },
396                            progress.number == stage_progress,
397                        )?;
398                    }
399
400                    let body = StoredBlockBodyIndices {
401                        first_tx_num,
402                        tx_count: progress.transaction_count() as u64,
403                    };
404
405                    first_tx_num = next_tx_num;
406
407                    tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
408                    Ok(())
409                })?;
410            }
411
412            Ok(blocks)
413        }
414
415        fn validate_execution(
416            &self,
417            input: ExecInput,
418            output: Option<ExecOutput>,
419        ) -> Result<(), TestRunnerError> {
420            if let Some(output) = output {
421                let start_block = input.checkpoint().block_number + 1;
422                let end_block = output.checkpoint.block_number;
423                if start_block > end_block {
424                    return Ok(())
425                }
426            }
427            self.check_hashed_storage()
428        }
429    }
430
431    impl UnwindStageTestRunner for StorageHashingTestRunner {
432        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
433            self.unwind_storage(input)?;
434            self.check_hashed_storage()
435        }
436    }
437
438    impl StorageHashingTestRunner {
439        fn set_clean_threshold(&mut self, threshold: u64) {
440            self.clean_threshold = threshold;
441        }
442
443        fn set_commit_threshold(&mut self, threshold: u64) {
444            self.commit_threshold = threshold;
445        }
446
447        fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
448            self.db
449                .query(|tx| {
450                    let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
451                    let mut hashed_storage_cursor =
452                        tx.cursor_dup_read::<tables::HashedStorages>()?;
453
454                    let mut expected = 0;
455
456                    while let Some((address, entry)) = storage_cursor.next()? {
457                        let key = keccak256(entry.key);
458                        let got =
459                            hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
460                        assert_eq!(
461                            got,
462                            Some(StorageEntry { key, ..entry }),
463                            "{expected}: {address:?}"
464                        );
465                        expected += 1;
466                    }
467                    let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
468
469                    assert_eq!(count, expected);
470                    Ok(())
471                })
472                .map_err(|e| e.into())
473        }
474
475        fn insert_storage_entry<TX: DbTxMut>(
476            &self,
477            tx: &TX,
478            bn_address: BlockNumberAddress,
479            entry: StorageEntry,
480            hash: bool,
481        ) -> Result<(), reth_db::DatabaseError> {
482            let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
483            let prev_entry =
484                match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
485                    Some(e) if e.key == entry.key => {
486                        tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
487                            .expect("failed to delete entry");
488                        e
489                    }
490                    _ => StorageEntry { key: entry.key, value: U256::from(0) },
491                };
492            tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
493
494            if hash {
495                let hashed_address = keccak256(bn_address.address());
496                let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
497
498                if let Some(e) = tx
499                    .cursor_dup_write::<tables::HashedStorages>()?
500                    .seek_by_key_subkey(hashed_address, hashed_entry.key)?
501                    .filter(|e| e.key == hashed_entry.key)
502                {
503                    tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
504                        .expect("failed to delete entry");
505                }
506
507                tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
508            }
509
510            tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
511            Ok(())
512        }
513
514        fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
515            tracing::debug!("unwinding storage...");
516            let target_block = input.unwind_to;
517            self.db.commit(|tx| {
518                let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
519                let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
520
521                let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
522
523                while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
524                    if bn_address.block_number() < target_block {
525                        break
526                    }
527
528                    if storage_cursor
529                        .seek_by_key_subkey(bn_address.address(), entry.key)?
530                        .filter(|e| e.key == entry.key)
531                        .is_some()
532                    {
533                        storage_cursor.delete_current()?;
534                    }
535
536                    if !entry.value.is_zero() {
537                        storage_cursor.upsert(bn_address.address(), &entry)?;
538                    }
539                }
540                Ok(())
541            })?;
542            Ok(())
543        }
544    }
545}