Skip to main content

reth_stages/stages/
hashing_storage.rs

1use alloy_primitives::{b256, bytes::BufMut, keccak256, Address, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbDupCursorRW},
6    models::{BlockNumberAddress, CompactU256},
7    table::Decompress,
8    tables,
9    transaction::{DbTx, DbTxMut},
10};
11use reth_etl::Collector;
12use reth_primitives_traits::StorageEntry;
13use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
14use reth_stages_api::{
15    BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
16    StageError, StageId, StorageHashingCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_storage_api::StorageSettingsCache;
19use reth_storage_errors::provider::ProviderResult;
20use std::{
21    collections::{BTreeMap, BTreeSet},
22    fmt::Debug,
23    sync::mpsc::{self, Receiver},
24};
25use tracing::*;
26
27/// Maximum number of channels that can exist in memory.
28const MAXIMUM_CHANNELS: usize = 10_000;
29
30/// Maximum number of storage entries to hash per rayon worker job.
31const WORKER_CHUNK_SIZE: usize = 100;
32
33/// Keccak256 hash of the zero address.
34const HASHED_ZERO_ADDRESS: B256 =
35    b256!("0x5380c7b7ae81a58eb98d9c78de4a1fd7fd9535fc953ed2be602daaa41767312a");
36
37/// Storage hashing stage hashes plain storage.
38/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
39#[derive(Debug)]
40pub struct StorageHashingStage {
41    /// The threshold (in number of blocks) for switching between incremental
42    /// hashing and full storage hashing.
43    pub clean_threshold: u64,
44    /// The maximum number of slots to process before committing during unwind.
45    pub commit_threshold: u64,
46    /// The maximum number of changeset entries to process before committing. The stage commits
47    /// after either `commit_threshold` blocks or `commit_entries` entries, whichever comes first.
48    pub commit_entries: u64,
49    /// ETL configuration
50    pub etl_config: EtlConfig,
51}
52
53impl StorageHashingStage {
54    /// Create new instance of [`StorageHashingStage`].
55    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
56        Self {
57            clean_threshold: config.clean_threshold,
58            commit_threshold: config.commit_threshold,
59            commit_entries: config.commit_entries,
60            etl_config,
61        }
62    }
63}
64
65impl Default for StorageHashingStage {
66    fn default() -> Self {
67        Self {
68            clean_threshold: 500_000,
69            commit_threshold: 100_000,
70            commit_entries: 30_000_000,
71            etl_config: EtlConfig::default(),
72        }
73    }
74}
75
76impl<Provider> Stage<Provider> for StorageHashingStage
77where
78    Provider: DBProvider<Tx: DbTxMut>
79        + StorageReader
80        + HashingWriter
81        + StatsReader
82        + StorageSettingsCache,
83{
84    /// Return the id of the stage
85    fn id(&self) -> StageId {
86        StageId::StorageHashing
87    }
88
89    /// Execute the stage.
90    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
91        let tx = provider.tx_ref();
92        if input.target_reached() {
93            return Ok(ExecOutput::done(input.checkpoint()))
94        }
95
96        // If use_hashed_state is enabled, execution writes directly to `HashedStorages`,
97        // so this stage becomes a no-op.
98        if provider.cached_storage_settings().use_hashed_state() {
99            return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
100        }
101
102        // Use the total remaining range to decide clean vs incremental.
103        let total_range = input.target() - input.checkpoint().block_number;
104        let from_block = input.next_block();
105
106        if total_range > self.clean_threshold || from_block == 1 {
107            // if there are more blocks than threshold it is faster to go over Plain state and
108            // hash all storage otherwise take changesets aggregate the sets and apply
109            // hashing to HashedStorages table. Also, if we start from genesis, we need to
110            // hash from scratch, as genesis accounts are not in changeset, along with their
111            // storages.
112
113            // clear table, load all entries and hash them
114            tx.clear::<tables::HashedStorages>()?;
115
116            let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
117            let mut collector =
118                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
119            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
120
121            for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
122                // An _unordered_ channel to receive results from a rayon job
123                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
124                let (tx, rx) = mpsc::sync_channel(chunk.len());
125                channels.push(rx);
126                // Spawn the hashing task onto the global rayon pool
127                rayon::spawn(move || {
128                    // Cache hashed address since PlainStorageState is sorted by address
129                    let (mut last_addr, mut hashed_addr) = (Address::ZERO, HASHED_ZERO_ADDRESS);
130                    for (address, slot) in chunk {
131                        if address != last_addr {
132                            last_addr = address;
133                            hashed_addr = keccak256(address);
134                        }
135                        let mut addr_key = Vec::with_capacity(64);
136                        addr_key.put_slice(hashed_addr.as_slice());
137                        addr_key.put_slice(keccak256(slot.key).as_slice());
138                        let _ = tx.send((addr_key, CompactU256::from(slot.value)));
139                    }
140                });
141
142                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
143                if !channels.is_empty() && channels.len().is_multiple_of(MAXIMUM_CHANNELS) {
144                    collect(&mut channels, &mut collector)?;
145                }
146            }
147
148            collect(&mut channels, &mut collector)?;
149
150            let total_hashes = collector.len();
151            let interval = (total_hashes / 10).max(1);
152            let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
153            for (index, item) in collector.iter()?.enumerate() {
154                if index > 0 && index.is_multiple_of(interval) {
155                    info!(
156                        target: "sync::stages::hashing_storage",
157                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
158                        "Inserting hashes"
159                    );
160                }
161
162                let (addr_key, value) = item?;
163                cursor.append_dup(
164                    B256::from_slice(&addr_key[..32]),
165                    StorageEntry {
166                        key: B256::from_slice(&addr_key[32..]),
167                        value: CompactU256::decompress_owned(value)?.into(),
168                    },
169                )?;
170            }
171
172            let checkpoint = StageCheckpoint::new(input.target())
173                .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
174                    progress: stage_checkpoint_progress(provider)?,
175                    ..Default::default()
176                });
177
178            Ok(ExecOutput { checkpoint, done: true })
179        } else {
180            // Stream changesets entry-by-entry, bounded by both block count
181            // (commit_threshold) and entry count (commit_entries), whichever comes first.
182            let BlockRangeOutput { block_range, is_final_range } =
183                input.next_block_range_with_threshold(self.commit_threshold);
184            let (from_block, to_block) = block_range.into_inner();
185
186            let mut changeset_cursor = tx.cursor_read::<tables::StorageChangeSets>()?;
187            let mut changed: BTreeMap<Address, BTreeSet<B256>> = BTreeMap::new();
188            let mut total_entries = 0u64;
189            let mut last_block = from_block;
190
191            for entry in
192                changeset_cursor.walk_range(BlockNumberAddress::range(from_block..=to_block))?
193            {
194                let (BlockNumberAddress((block_number, address)), storage_entry) = entry?;
195
196                // Check the entry limit only at block boundaries so we never
197                // checkpoint mid-block (which would skip the remaining entries
198                // for that block on the next invocation).
199                if block_number != last_block && total_entries >= self.commit_entries {
200                    break;
201                }
202
203                last_block = block_number;
204                changed.entry(address).or_default().insert(storage_entry.key);
205                total_entries += 1;
206            }
207
208            let storages = provider.plain_state_storages(changed)?;
209            provider.insert_storage_for_hashing(storages)?;
210
211            let exhausted = total_entries < self.commit_entries;
212            let done = exhausted && is_final_range;
213            let progress_block = if exhausted { to_block } else { last_block };
214
215            let checkpoint = StageCheckpoint::new(progress_block)
216                .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
217                    progress: stage_checkpoint_progress(provider)?,
218                    ..Default::default()
219                });
220
221            Ok(ExecOutput { checkpoint, done })
222        }
223    }
224
225    /// Unwind the stage.
226    fn unwind(
227        &mut self,
228        provider: &Provider,
229        input: UnwindInput,
230    ) -> Result<UnwindOutput, StageError> {
231        // NOTE: this runs in both v1 and v2 mode. In v2 mode, execution writes
232        // directly to `HashedStorages`, but the unwind must still revert those
233        // entries here because `MerkleUnwind` runs after this stage (in unwind
234        // order) and needs `HashedStorages` to reflect the target block state
235        // before it can verify the state root.
236        let (range, unwind_progress, _) =
237            input.unwind_block_range_with_threshold(self.commit_threshold);
238
239        provider.unwind_storage_hashing_range(range)?;
240
241        let mut stage_checkpoint =
242            input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
243
244        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
245
246        Ok(UnwindOutput {
247            checkpoint: StageCheckpoint::new(unwind_progress)
248                .with_storage_hashing_stage_checkpoint(stage_checkpoint),
249        })
250    }
251}
252
253/// Flushes channels hashes to ETL collector.
254fn collect(
255    channels: &mut Vec<Receiver<(Vec<u8>, CompactU256)>>,
256    collector: &mut Collector<Vec<u8>, CompactU256>,
257) -> Result<(), StageError> {
258    for channel in channels.iter_mut() {
259        while let Ok((key, v)) = channel.recv() {
260            collector.insert(key, v)?;
261        }
262    }
263    info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
264    channels.clear();
265    Ok(())
266}
267
268fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
269    Ok(EntitiesCheckpoint {
270        processed: provider.count_entries::<tables::HashedStorages>()? as u64,
271        total: provider.count_entries::<tables::PlainStorageState>()? as u64,
272    })
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use crate::test_utils::{
279        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
280        TestStageDB, UnwindStageTestRunner,
281    };
282    use alloy_primitives::{Address, U256};
283    use assert_matches::assert_matches;
284    use rand::Rng;
285    use reth_db_api::{
286        cursor::{DbCursorRW, DbDupCursorRO},
287        models::{BlockNumberAddress, StoredBlockBodyIndices},
288    };
289    use reth_ethereum_primitives::Block;
290    use reth_primitives_traits::SealedBlock;
291    use reth_provider::providers::StaticFileWriter;
292    use reth_testing_utils::generators::{
293        self, random_block_range, random_contract_account_range, BlockRangeParams,
294    };
295
296    stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
297
298    /// Execute with low clean threshold so as to hash whole storage
299    #[tokio::test]
300    async fn execute_clean_storage_hashing() {
301        let (previous_stage, stage_progress) = (500, 100);
302
303        // Set up the runner
304        let mut runner = StorageHashingTestRunner::default();
305
306        // set low clean threshold so we hash the whole storage
307        runner.set_clean_threshold(1);
308
309        // set low commit threshold so we force each entry to be a tx.commit and make sure we don't
310        // hang on one key. Seed execution inserts more than one storage entry per address.
311        runner.set_commit_threshold(1);
312
313        let mut input = ExecInput {
314            target: Some(previous_stage),
315            checkpoint: Some(StageCheckpoint::new(stage_progress)),
316        };
317
318        runner.seed_execution(input).expect("failed to seed execution");
319
320        loop {
321            if let Ok(result @ ExecOutput { checkpoint, done }) =
322                runner.execute(input).await.unwrap()
323            {
324                if !done {
325                    let previous_checkpoint = input
326                        .checkpoint
327                        .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
328                        .unwrap_or_default();
329                    assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
330                        progress: EntitiesCheckpoint {
331                            processed,
332                            total,
333                        },
334                        ..
335                    }) if processed == previous_checkpoint.progress.processed + 1 &&
336                        total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
337
338                    // Continue from checkpoint
339                    input.checkpoint = Some(checkpoint);
340                    continue
341                }
342                assert_eq!(checkpoint.block_number, previous_stage);
343                assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
344                        progress: EntitiesCheckpoint {
345                            processed,
346                            total,
347                        },
348                        ..
349                    }) if processed == total &&
350                        total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
351
352                // Validate the stage execution
353                assert!(
354                    runner.validate_execution(input, Some(result)).is_ok(),
355                    "execution validation"
356                );
357
358                break
359            }
360            panic!("Failed execution");
361        }
362    }
363
364    struct StorageHashingTestRunner {
365        db: TestStageDB,
366        commit_threshold: u64,
367        clean_threshold: u64,
368        commit_entries: u64,
369        etl_config: EtlConfig,
370    }
371
372    impl Default for StorageHashingTestRunner {
373        fn default() -> Self {
374            Self {
375                db: TestStageDB::default(),
376                commit_threshold: 1000,
377                clean_threshold: 1000,
378                commit_entries: u64::MAX,
379                etl_config: EtlConfig::default(),
380            }
381        }
382    }
383
384    impl StageTestRunner for StorageHashingTestRunner {
385        type S = StorageHashingStage;
386
387        fn db(&self) -> &TestStageDB {
388            &self.db
389        }
390
391        fn stage(&self) -> Self::S {
392            Self::S {
393                commit_threshold: self.commit_threshold,
394                clean_threshold: self.clean_threshold,
395                commit_entries: self.commit_entries,
396                etl_config: self.etl_config.clone(),
397            }
398        }
399    }
400
401    impl ExecuteStageTestRunner for StorageHashingTestRunner {
402        type Seed = Vec<SealedBlock<Block>>;
403
404        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
405            let stage_progress = input.next_block();
406            let end = input.target();
407            let mut rng = generators::rng();
408
409            let n_accounts = 31;
410            let mut accounts = random_contract_account_range(&mut rng, &mut (0..n_accounts));
411
412            let blocks = random_block_range(
413                &mut rng,
414                stage_progress..=end,
415                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..3, ..Default::default() },
416            );
417
418            self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
419            let mut tx_hash_numbers = Vec::new();
420
421            let iter = blocks.iter();
422            let mut next_tx_num = 0;
423            let mut first_tx_num = next_tx_num;
424            for progress in iter {
425                // Insert last progress data
426                let block_number = progress.number;
427                self.db.commit(|tx| {
428                    progress.body().transactions.iter().try_for_each(
429                        |transaction| -> Result<(), reth_db::DatabaseError> {
430                            tx_hash_numbers.push((*transaction.tx_hash(), next_tx_num));
431                            tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
432
433                            let (addr, _) = accounts
434                                .get_mut((rng.random::<u64>() % n_accounts) as usize)
435                                .unwrap();
436
437                            for _ in 0..2 {
438                                let new_entry = StorageEntry {
439                                    key: keccak256([rng.random::<u8>()]),
440                                    value: U256::from(rng.random::<u8>() % 30 + 1),
441                                };
442                                self.insert_storage_entry(
443                                    tx,
444                                    (block_number, *addr).into(),
445                                    new_entry,
446                                    progress.number == stage_progress,
447                                )?;
448                            }
449
450                            next_tx_num += 1;
451                            Ok(())
452                        },
453                    )?;
454
455                    // Randomize rewards
456                    let has_reward: bool = rng.random();
457                    if has_reward {
458                        self.insert_storage_entry(
459                            tx,
460                            (block_number, Address::random()).into(),
461                            StorageEntry {
462                                key: keccak256("mining"),
463                                value: U256::from(rng.random::<u32>()),
464                            },
465                            progress.number == stage_progress,
466                        )?;
467                    }
468
469                    let body = StoredBlockBodyIndices {
470                        first_tx_num,
471                        tx_count: progress.transaction_count() as u64,
472                    };
473
474                    first_tx_num = next_tx_num;
475
476                    tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
477                    Ok(())
478                })?;
479            }
480            self.db.insert_tx_hash_numbers(tx_hash_numbers)?;
481
482            Ok(blocks)
483        }
484
485        fn validate_execution(
486            &self,
487            input: ExecInput,
488            output: Option<ExecOutput>,
489        ) -> Result<(), TestRunnerError> {
490            if let Some(output) = output {
491                let start_block = input.checkpoint().block_number + 1;
492                let end_block = output.checkpoint.block_number;
493                if start_block > end_block {
494                    return Ok(())
495                }
496            }
497            self.check_hashed_storage()
498        }
499    }
500
501    impl UnwindStageTestRunner for StorageHashingTestRunner {
502        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
503            self.unwind_storage(input)?;
504            self.check_hashed_storage()
505        }
506    }
507
508    impl StorageHashingTestRunner {
509        fn set_clean_threshold(&mut self, threshold: u64) {
510            self.clean_threshold = threshold;
511        }
512
513        fn set_commit_threshold(&mut self, threshold: u64) {
514            self.commit_threshold = threshold;
515        }
516
517        fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
518            self.db
519                .query(|tx| {
520                    let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
521                    let mut hashed_storage_cursor =
522                        tx.cursor_dup_read::<tables::HashedStorages>()?;
523
524                    let mut expected = 0;
525
526                    while let Some((address, entry)) = storage_cursor.next()? {
527                        let key = keccak256(entry.key);
528                        let got =
529                            hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
530                        assert_eq!(
531                            got,
532                            Some(StorageEntry { key, ..entry }),
533                            "{expected}: {address:?}"
534                        );
535                        expected += 1;
536                    }
537                    let count = tx.cursor_dup_read::<tables::HashedStorages>()?.walk(None)?.count();
538
539                    assert_eq!(count, expected);
540                    Ok(())
541                })
542                .map_err(|e| e.into())
543        }
544
545        fn insert_storage_entry<TX: DbTxMut>(
546            &self,
547            tx: &TX,
548            bn_address: BlockNumberAddress,
549            entry: StorageEntry,
550            hash: bool,
551        ) -> Result<(), reth_db::DatabaseError> {
552            let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
553            let prev_entry =
554                match storage_cursor.seek_by_key_subkey(bn_address.address(), entry.key)? {
555                    Some(e) if e.key == entry.key => {
556                        tx.delete::<tables::PlainStorageState>(bn_address.address(), Some(e))
557                            .expect("failed to delete entry");
558                        e
559                    }
560                    _ => StorageEntry { key: entry.key, value: U256::from(0) },
561                };
562            tx.put::<tables::PlainStorageState>(bn_address.address(), entry)?;
563
564            if hash {
565                let hashed_address = keccak256(bn_address.address());
566                let hashed_entry = StorageEntry { key: keccak256(entry.key), value: entry.value };
567
568                if let Some(e) = tx
569                    .cursor_dup_write::<tables::HashedStorages>()?
570                    .seek_by_key_subkey(hashed_address, hashed_entry.key)?
571                    .filter(|e| e.key == hashed_entry.key)
572                {
573                    tx.delete::<tables::HashedStorages>(hashed_address, Some(e))
574                        .expect("failed to delete entry");
575                }
576
577                tx.put::<tables::HashedStorages>(hashed_address, hashed_entry)?;
578            }
579
580            tx.put::<tables::StorageChangeSets>(bn_address, prev_entry)?;
581            Ok(())
582        }
583
584        fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
585            tracing::debug!("unwinding storage...");
586            let target_block = input.unwind_to;
587            self.db.commit(|tx| {
588                let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
589                let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSets>()?;
590
591                let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
592
593                while let Some((bn_address, entry)) = rev_changeset_walker.next().transpose()? {
594                    if bn_address.block_number() < target_block {
595                        break
596                    }
597
598                    if storage_cursor
599                        .seek_by_key_subkey(bn_address.address(), entry.key)?
600                        .is_some_and(|e| e.key == entry.key)
601                    {
602                        storage_cursor.delete_current()?;
603                    }
604
605                    if !entry.value.is_zero() {
606                        storage_cursor.upsert(bn_address.address(), &entry)?;
607                    }
608                }
609                Ok(())
610            })?;
611            Ok(())
612        }
613    }
614}