Skip to main content

reth_stages/stages/
hashing_storage.rs

1use alloy_primitives::{b256, bytes::BufMut, keccak256, Address, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbDupCursorRW},
6    models::CompactU256,
7    table::Decompress,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
16    StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_api::StorageSettingsCache;
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21    fmt::Debug,
22    sync::mpsc::{self, Receiver},
23};
24use tracing::*;
25
26/// Maximum number of channels that can exist in memory.
27const MAXIMUM_CHANNELS: usize = 10_000;
28
29/// Maximum number of storage entries to hash per rayon worker job.
30const WORKER_CHUNK_SIZE: usize = 100;
31
32/// Keccak256 hash of the zero address.
33const HASHED_ZERO_ADDRESS: B256 =
34    b256!("0x5380c7b7ae81a58eb98d9c78de4a1fd7fd9535fc953ed2be602daaa41767312a");
35
36/// Storage hashing stage hashes plain storage.
37/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
38#[derive(Debug)]
39pub struct StorageHashingStage {
40    /// The threshold (in number of blocks) for switching between incremental
41    /// hashing and full storage hashing.
42    pub clean_threshold: u64,
43    /// The maximum number of slots to process before committing during unwind.
44    pub commit_threshold: u64,
45    /// ETL configuration
46    pub etl_config: EtlConfig,
47}
48
49impl StorageHashingStage {
50    /// Create new instance of [`StorageHashingStage`].
51    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
52        Self {
53            clean_threshold: config.clean_threshold,
54            commit_threshold: config.commit_threshold,
55            etl_config,
56        }
57    }
58}
59
60impl Default for StorageHashingStage {
61    fn default() -> Self {
62        Self {
63            clean_threshold: 500_000,
64            commit_threshold: 100_000,
65            etl_config: EtlConfig::default(),
66        }
67    }
68}
69
70impl<Provider> Stage<Provider> for StorageHashingStage
71where
72    Provider: DBProvider<Tx: DbTxMut>
73        + StorageReader
74        + HashingWriter
75        + StatsReader
76        + StorageSettingsCache,
77{
78    /// Return the id of the stage
79    fn id(&self) -> StageId {
80        StageId::StorageHashing
81    }
82
83    /// Execute the stage.
84    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
85        let tx = provider.tx_ref();
86        if input.target_reached() {
87            return Ok(ExecOutput::done(input.checkpoint()))
88        }
89
90        // If use_hashed_state is enabled, execution writes directly to `HashedStorages`,
91        // so this stage becomes a no-op.
92        if provider.cached_storage_settings().use_hashed_state() {
93            return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
94        }
95
96        let (from_block, to_block) = input.next_block_range().into_inner();
97
98        // if there are more blocks then threshold it is faster to go over Plain state and hash all
99        // account otherwise take changesets aggregate the sets and apply hashing to
100        // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
101        // genesis accounts are not in changeset, along with their storages.
102        if to_block - from_block > self.clean_threshold || from_block == 1 {
103            // clear table, load all accounts and hash it
104            tx.clear::<tables::HashedStorages>()?;
105
106            let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
107            let mut collector =
108                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
109            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
110
111            for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
112                // An _unordered_ channel to receive results from a rayon job
113                let (tx, rx) = mpsc::channel();
114                channels.push(rx);
115
116                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
117                // Spawn the hashing task onto the global rayon pool
118                rayon::spawn(move || {
119                    // Cache hashed address since PlainStorageState is sorted by address
120                    let (mut last_addr, mut hashed_addr) = (Address::ZERO, HASHED_ZERO_ADDRESS);
121                    for (address, slot) in chunk {
122                        if address != last_addr {
123                            last_addr = address;
124                            hashed_addr = keccak256(address);
125                        }
126                        let mut addr_key = Vec::with_capacity(64);
127                        addr_key.put_slice(hashed_addr.as_slice());
128                        addr_key.put_slice(keccak256(slot.key).as_slice());
129                        let _ = tx.send((addr_key, CompactU256::from(slot.value)));
130                    }
131                });
132
133                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
134                if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
135                    collect(&mut channels, &mut collector)?;
136                }
137            }
138
139            collect(&mut channels, &mut collector)?;
140
141            let total_hashes = collector.len();
142            let interval = (total_hashes / 10).max(1);
143            let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
144            for (index, item) in collector.iter()?.enumerate() {
145                if index > 0 && index.is_multiple_of(interval) {
146                    info!(
147                        target: "sync::stages::hashing_storage",
148                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
149                        "Inserting hashes"
150                    );
151                }
152
153                let (addr_key, value) = item?;
154                cursor.append_dup(
155                    B256::from_slice(&addr_key[..32]),
156                    StorageEntry {
157                        key: B256::from_slice(&addr_key[32..]),
158                        value: CompactU256::decompress_owned(value)?.into(),
159                    },
160                )?;
161            }
162        } else {
163            // Aggregate all changesets and make list of storages that have been
164            // changed.
165            let lists = provider.changed_storages_with_range(from_block..=to_block)?;
166            // iterate over plain state and get newest storage value.
167            // Assumption we are okay with is that plain state represent
168            // `previous_stage_progress` state.
169            let storages = provider.plain_state_storages(lists)?;
170            provider.insert_storage_for_hashing(storages)?;
171        }
172
173        // We finished the hashing stage, no future iterations is expected for the same block range,
174        // so no checkpoint is needed.
175        let checkpoint = StageCheckpoint::new(input.target())
176            .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
177                progress: stage_checkpoint_progress(provider)?,
178                ..Default::default()
179            });
180
181        Ok(ExecOutput { checkpoint, done: true })
182    }
183
184    /// Unwind the stage.
185    fn unwind(
186        &mut self,
187        provider: &Provider,
188        input: UnwindInput,
189    ) -> Result<UnwindOutput, StageError> {
190        // NOTE: this runs in both v1 and v2 mode. In v2 mode, execution writes
191        // directly to `HashedStorages`, but the unwind must still revert those
192        // entries here because `MerkleUnwind` runs after this stage (in unwind
193        // order) and needs `HashedStorages` to reflect the target block state
194        // before it can verify the state root.
195        let (range, unwind_progress, _) =
196            input.unwind_block_range_with_threshold(self.commit_threshold);
197
198        provider.unwind_storage_hashing_range(range)?;
199
200        let mut stage_checkpoint =
201            input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
202
203        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
204
205        Ok(UnwindOutput {
206            checkpoint: StageCheckpoint::new(unwind_progress)
207                .with_storage_hashing_stage_checkpoint(stage_checkpoint),
208        })
209    }
210}
211
212/// Flushes channels hashes to ETL collector.
213fn collect(
214    channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
215    collector: &mut Collector<Vec<u8>, CompactU256>,
216) -> Result<(), StageError> {
217    for channel in channels.iter_mut() {
218        while let Ok((key, v)) = channel.recv() {
219            collector.insert(key, v)?;
220        }
221    }
222    info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
223    channels.clear();
224    Ok(())
225}
226
227fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
228    Ok(EntitiesCheckpoint {
229        processed: provider.count_entries::<tables::HashedStorages>()? as u64,
230        total: provider.count_entries::<tables::PlainStorageState>()? as u64,
231    })
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::test_utils::{
238        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
239        TestStageDB, UnwindStageTestRunner,
240    };
241    use alloy_primitives::{Address, U256};
242    use assert_matches::assert_matches;
243    use rand::Rng;
244    use reth_db_api::{
245        cursor::{DbCursorRW, DbDupCursorRO},
246        models::{BlockNumberAddress, StoredBlockBodyIndices},
247    };
248    use reth_ethereum_primitives::Block;
249    use reth_primitives_traits::SealedBlock;
250    use reth_provider::providers::StaticFileWriter;
251    use reth_testing_utils::generators::{
252        self, random_block_range, random_contract_account_range, BlockRangeParams,
253    };
254
255    stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
256
257    /// Execute with low clean threshold so as to hash whole storage
258    #[tokio::test]
259    async fn execute_clean_storage_hashing() {
260        let (previous_stage, stage_progress) = (500, 100);
261
262        // Set up the runner
263        let mut runner = StorageHashingTestRunner::default();
264
265        // set low clean threshold so we hash the whole storage
266        runner.set_clean_threshold(1);
267
268        // set low commit threshold so we force each entry to be a tx.commit and make sure we don't
269        // hang on one key. Seed execution inserts more than one storage entry per address.
270        runner.set_commit_threshold(1);
271
272        let mut input = ExecInput {
273            target: Some(previous_stage),
274            checkpoint: Some(StageCheckpoint::new(stage_progress)),
275        };
276
277        runner.seed_execution(input).expect("failed to seed execution");
278
279        loop {
280            if let Ok(result @ ExecOutput { checkpoint, done }) =
281                runner.execute(input).await.unwrap()
282            {
283                if !done {
284                    let previous_checkpoint = input
285                        .checkpoint
286                        .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
287                        .unwrap_or_default();
288                    assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
289                        progress: EntitiesCheckpoint {
290                            processed,
291                            total,
292                        },
293                        ..
294                    }) if processed == previous_checkpoint.progress.processed + 1 &&
295                        total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
296
297                    // Continue from checkpoint
298                    input.checkpoint = Some(checkpoint);
299                    continue
300                }
301                assert_eq!(checkpoint.block_number, previous_stage);
302                assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
303                        progress: EntitiesCheckpoint {
304                            processed,
305                            total,
306                        },
307                        ..
308                    }) if processed == total &&
309                        total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
310
311                // Validate the stage execution
312                assert!(
313                    runner.validate_execution(input, Some(result)).is_ok(),
314                    "execution validation"
315                );
316
317                break
318            }
319            panic!("Failed execution");
320        }
321    }
322
323    struct StorageHashingTestRunner {
324        db: TestStageDB,
325        commit_threshold: u64,
326        clean_threshold: u64,
327        etl_config: EtlConfig,
328    }
329
330    impl Default for StorageHashingTestRunner {
331        fn default() -> Self {
332            Self {
333                db: TestStageDB::default(),
334                commit_threshold: 1000,
335                clean_threshold: 1000,
336                etl_config: EtlConfig::default(),
337            }
338        }
339    }
340
341    impl StageTestRunner for StorageHashingTestRunner {
342        type S = StorageHashingStage;
343
344        fn db(&self) -> &TestStageDB {
345            &self.db
346        }
347
348        fn stage(&self) -> Self::S {
349            Self::S {
350                commit_threshold: self.commit_threshold,
351                clean_threshold: self.clean_threshold,
352                etl_config: self.etl_config.clone(),
353            }
354        }
355    }
356
357    impl ExecuteStageTestRunner for StorageHashingTestRunner {
358        type Seed = Vec<SealedBlock<Block>>;
359
360        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
361            let stage_progress = input.next_block();
362            let end = input.target();
363            let mut rng = generators::rng();
364
365            let n_accounts = 31;
366            let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
367
368            let blocks = random_block_range(
369                &mut rng,
370                stage_progress..=end,
371                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
372            );
373
374            self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
375            let mut tx_hash_numbers = Vec::new();
376
377            let iter = blocks.iter();
378            let mut next_tx_num = 0;
379            let mut first_tx_num = next_tx_num;
380            for progress in iter {
381                // Insert last progress data
382                let block_number = progress.number;
383                self.db.commit(|tx| {
384                    progress.body().transactions.iter().try_for_each(
385                        |transaction| -> Result<(), reth_db::DatabaseError> {
386                            tx_hash_numbers.push((*transaction.tx_hash(), next_tx_num));
387                            tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
388
389                            let (addr, _) = accounts
390                                .get_mut((rng.random::<u64>() % n_accounts) as usize)
391                                .unwrap();
392
393                            for _ in 0..2 {
394                                let new_entry = StorageEntry {
395                                    key: keccak256([rng.random::<u8>()]),
396                                    value: U256::from(rng.random::<u8>() % 30 + 1),
397                                };
398                                self.insert_storage_entry(
399                                    tx,
400                                    (block_number, *addr).into(),
401                                    new_entry,
402                                    progress.number == stage_progress,
403                                )?;
404                            }
405
406                            next_tx_num += 1;
407                            Ok(())
408                        },
409                    )?;
410
411                    // Randomize rewards
412                    let has_reward: bool = rng.random();
413                    if has_reward {
414                        self.insert_storage_entry(
415                            tx,
416                            (block_number, Address::random()).into(),
417                            StorageEntry {
418                                key: keccak256("mining"),
419                                value: U256::from(rng.random::<u32>()),
420                            },
421                            progress.number == stage_progress,
422                        )?;
423                    }
424
425                    let body = StoredBlockBodyIndices {
426                        first_tx_num,
427                        tx_count: progress.transaction_count() as u64,
428                    };
429
430                    first_tx_num = next_tx_num;
431
432                    tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
433                    Ok(())
434                })?;
435            }
436            self.db.insert_tx_hash_numbers(tx_hash_numbers)?;
437
438            Ok(blocks)
439        }
440
441        fn validate_execution(
442            &self,
443            input: ExecInput,
444            output: Option<ExecOutput>,
445        ) -> Result<(), TestRunnerError> {
446            if let Some(output) = output {
447                let start_block = input.checkpoint().block_number + 1;
448                let end_block = output.checkpoint.block_number;
449                if start_block > end_block {
450                    return Ok(())
451                }
452            }
453            self.check_hashed_storage()
454        }
455    }
456
457    impl UnwindStageTestRunner for StorageHashingTestRunner {
458        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
459            self.unwind_storage(input)?;
460            self.check_hashed_storage()
461        }
462    }
463
464    impl StorageHashingTestRunner {
465        fn set_clean_threshold(&mut self, threshold: u64) {
466            self.clean_threshold = threshold;
467        }
468
469        fn set_commit_threshold(&mut self, threshold: u64) {
470            self.commit_threshold = threshold;
471        }
472
473        fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
474            self.db
475                .query(|tx| {
476                    let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
477                    let mut hashed_storage_cursor =
478                        tx.cursor_dup_read::<tables::HashedStorages>()?;
479
480                    let mut expected = 0;
481
482                    while let Some((address, entry)) = storage_cursor.next()? {
483                        let key = keccak256(entry.key);
484                        let got =
485                            hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
486                        assert_eq!(
487                            got,
488                            Some(StorageEntry { key, ..entry }),
489                            "{expected}: {address:?}"
490                        );
491                        expected += 1;
492                    }
493                    let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
494
495                    assert_eq!(count, expected);
496                    Ok(())
497                })
498                .map_err(|e| e.into())
499        }
500
501        fn insert_storage_entry<TX: DbTxMut>(
502            &self,
503            tx: &TX,
504            bn_address: BlockNumberAddress,
505            entry: StorageEntry,
506            hash: bool,
507        ) -> Result<(), reth_db::DatabaseError> {
508            let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
509            let prev_entry =
510                match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
511                    Some(e) if e.key == entry.key => {
512                        tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
513                            .expect("failed to delete entry");
514                        e
515                    }
516                    _ => StorageEntry { key: entry.key, value: U256::from(0) },
517                };
518            tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
519
520            if hash {
521                let hashed_address = keccak256(bn_address.address());
522                let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
523
524                if let Some(e) = tx
525                    .cursor_dup_write::<tables::HashedStorages>()?
526                    .seek_by_key_subkey(hashed_address, hashed_entry.key)?
527                    .filter(|e| e.key == hashed_entry.key)
528                {
529                    tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
530                        .expect("failed to delete entry");
531                }
532
533                tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
534            }
535
536            tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
537            Ok(())
538        }
539
540        fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
541            tracing::debug!("unwinding storage...");
542            let target_block = input.unwind_to;
543            self.db.commit(|tx| {
544                let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
545                let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
546
547                let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
548
549                while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
550                    if bn_address.block_number() < target_block {
551                        break
552                    }
553
554                    if storage_cursor
555                        .seek_by_key_subkey(bn_address.address(), entry.key)?
556                        .filter(|e| e.key == entry.key)
557                        .is_some()
558                    {
559                        storage_cursor.delete_current()?;
560                    }
561
562                    if !entry.value.is_zero() {
563                        storage_cursor.upsert(bn_address.address(), &entry)?;
564                    }
565                }
566                Ok(())
567            })?;
568            Ok(())
569        }
570    }
571}