Skip to main content

reth_stages/stages/
mod.rs

1/// The bodies stage.
2mod bodies;
3mod era;
4/// The execution stage that generates state diff.
5mod execution;
6/// The finish stage
7mod finish;
8/// Account hashing stage.
9mod hashing_account;
10/// Storage hashing stage.
11mod hashing_storage;
12/// The headers stage.
13mod headers;
14/// Index history of account changes
15mod index_account_history;
16/// Index history of storage changes
17mod index_storage_history;
18/// Stage for computing state root.
19mod merkle;
20mod prune;
21/// The sender recovery stage.
22mod sender_recovery;
23/// The transaction lookup stage
24mod tx_lookup;
25
26pub use bodies::*;
27pub use era::*;
28pub use execution::*;
29pub use finish::*;
30pub use hashing_account::*;
31pub use hashing_storage::*;
32pub use headers::*;
33pub use index_account_history::*;
34pub use index_storage_history::*;
35pub use merkle::*;
36pub use prune::*;
37pub use sender_recovery::*;
38pub use tx_lookup::*;
39
40mod utils;
41use utils::*;
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use crate::test_utils::{StorageKind, TestStageDB};
47    use alloy_consensus::{SignableTransaction, TxLegacy};
48    use alloy_primitives::{
49        address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
50    };
51    use alloy_rlp::Decodable;
52    use reth_chainspec::ChainSpecBuilder;
53    use reth_db::mdbx::{cursor::Cursor, RW};
54    use reth_db_api::{
55        cursor::{DbCursorRO, DbCursorRW},
56        table::Table,
57        tables,
58        transaction::{DbTx, DbTxMut},
59        AccountsHistory,
60    };
61    use reth_ethereum_consensus::EthBeaconConsensus;
62    use reth_ethereum_primitives::Block;
63    use reth_evm_ethereum::EthEvmConfig;
64    use reth_exex::ExExManagerHandle;
65    use reth_primitives_traits::{Account, Bytecode, SealedBlock};
66    use reth_provider::{
67        providers::{StaticFileProvider, StaticFileWriter},
68        test_utils::MockNodeTypesWithDB,
69        AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory,
70        ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
71        StaticFileProviderFactory, StorageReader,
72    };
73    use reth_prune_types::{PruneMode, PruneModes};
74    use reth_stages_api::{
75        ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
76    };
77    use reth_static_file_types::StaticFileSegment;
78    use reth_testing_utils::generators::{
79        self, random_block, random_block_range, random_receipt, BlockRangeParams,
80    };
81    use std::{io::Write, sync::Arc};
82
83    #[tokio::test]
84    #[ignore]
85    async fn test_prune() {
86        let test_db = TestStageDB::default();
87
88        let provider_rw = test_db.factory.provider_rw().unwrap();
89        let tip = 66;
90        let input = ExecInput { target: Some(tip), checkpoint: None };
91        let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
92        let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
93        let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
94        let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
95        let mut head = block.hash();
96        provider_rw.insert_block(&genesis.try_recover().unwrap()).unwrap();
97        provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
98
99        // Fill with bogus blocks to respect PruneMode distance.
100        let mut rng = generators::rng();
101        for block_number in 2..=tip {
102            let nblock = random_block(
103                &mut rng,
104                block_number,
105                generators::BlockParams { parent: Some(head), ..Default::default() },
106            );
107            head = nblock.hash();
108            provider_rw.insert_block(&nblock.try_recover().unwrap()).unwrap();
109        }
110        provider_rw
111            .static_file_provider()
112            .latest_writer(StaticFileSegment::Headers)
113            .unwrap()
114            .commit()
115            .unwrap();
116        provider_rw.commit().unwrap();
117
118        // insert pre state
119        let provider_rw = test_db.factory.provider_rw().unwrap();
120        let code = hex!("5a465a905090036002900360015500");
121        let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
122        provider_rw
123            .tx_ref()
124            .put::<tables::PlainAccountState>(
125                address!("0x1000000000000000000000000000000000000000"),
126                Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
127            )
128            .unwrap();
129        provider_rw
130            .tx_ref()
131            .put::<tables::PlainAccountState>(
132                address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
133                Account {
134                    nonce: 0,
135                    balance: U256::from(0x3635c9adc5dea00000u128),
136                    bytecode_hash: None,
137                },
138            )
139            .unwrap();
140        provider_rw
141            .tx_ref()
142            .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
143            .unwrap();
144        provider_rw.commit().unwrap();
145
146        let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
147                             prune_modes: PruneModes,
148                             expect_num_receipts: usize,
149                             expect_num_acc_changesets: usize,
150                             expect_num_storage_changesets: usize| async move {
151            let provider = factory.database_provider_rw().unwrap();
152
153            // Check execution and create receipts and changesets according to the pruning
154            // configuration
155            let mut execution_stage = ExecutionStage::new(
156                EthEvmConfig::ethereum(Arc::new(
157                    ChainSpecBuilder::mainnet().berlin_activated().build(),
158                )),
159                Arc::new(EthBeaconConsensus::new(Arc::new(
160                    ChainSpecBuilder::mainnet().berlin_activated().build(),
161                ))),
162                ExecutionStageThresholds {
163                    max_blocks: Some(100),
164                    max_changes: None,
165                    max_cumulative_gas: None,
166                    max_duration: None,
167                },
168                MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
169                ExExManagerHandle::empty(),
170            );
171
172            execution_stage.execute(&provider, input).unwrap();
173            assert_eq!(
174                provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
175                expect_num_receipts
176            );
177
178            assert_eq!(
179                provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
180                expect_num_storage_changesets
181            );
182
183            assert_eq!(
184                provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
185                expect_num_acc_changesets
186            );
187
188            // Check AccountHistory
189            let mut acc_indexing_stage = IndexAccountHistoryStage {
190                prune_mode: prune_modes.account_history,
191                ..Default::default()
192            };
193
194            if prune_modes.account_history == Some(PruneMode::Full) {
195                // Full is not supported
196                assert!(acc_indexing_stage.execute(&provider, input).is_err());
197            } else {
198                acc_indexing_stage.execute(&provider, input).unwrap();
199                let mut account_history: Cursor<RW, AccountsHistory> =
200                    provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
201                assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
202            }
203
204            // Check StorageHistory
205            let mut storage_indexing_stage = IndexStorageHistoryStage {
206                prune_mode: prune_modes.storage_history,
207                ..Default::default()
208            };
209
210            if prune_modes.storage_history == Some(PruneMode::Full) {
211                // Full is not supported
212                assert!(storage_indexing_stage.execute(&provider, input).is_err());
213            } else {
214                storage_indexing_stage.execute(&provider, input).unwrap();
215
216                let mut storage_history =
217                    provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
218                assert_eq!(
219                    storage_history.walk(None).unwrap().count(),
220                    expect_num_storage_changesets
221                );
222            }
223        };
224
225        // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
226        // storage.
227        let mut prune = PruneModes::default();
228        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
229
230        prune.receipts = Some(PruneMode::Full);
231        prune.account_history = Some(PruneMode::Full);
232        prune.storage_history = Some(PruneMode::Full);
233        // This will result in error for account_history and storage_history, which is caught.
234        check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
235
236        prune.receipts = Some(PruneMode::Before(1));
237        prune.account_history = Some(PruneMode::Before(1));
238        prune.storage_history = Some(PruneMode::Before(1));
239        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
240
241        prune.receipts = Some(PruneMode::Before(2));
242        prune.account_history = Some(PruneMode::Before(2));
243        prune.storage_history = Some(PruneMode::Before(2));
244        // The one account is the miner
245        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
246
247        prune.receipts = Some(PruneMode::Distance(66));
248        prune.account_history = Some(PruneMode::Distance(66));
249        prune.storage_history = Some(PruneMode::Distance(66));
250        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
251
252        prune.receipts = Some(PruneMode::Distance(64));
253        prune.account_history = Some(PruneMode::Distance(64));
254        prune.storage_history = Some(PruneMode::Distance(64));
255        // The one account is the miner
256        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
257    }
258
259    /// It will generate `num_blocks`, push them to static files and set all stage checkpoints to
260    /// `num_blocks - 1`.
261    fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
262        let db = TestStageDB::default();
263        let mut rng = generators::rng();
264        let genesis_hash = B256::ZERO;
265        let tip = (num_blocks - 1) as u64;
266
267        let blocks = random_block_range(
268            &mut rng,
269            0..=tip,
270            BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
271        );
272        db.insert_blocks(blocks.iter(), StorageKind::Static)?;
273
274        let mut receipts = Vec::with_capacity(blocks.len());
275        let mut tx_num = 0u64;
276        for block in &blocks {
277            let mut block_receipts = Vec::with_capacity(block.transaction_count());
278            for transaction in &block.body().transactions {
279                block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0), None)));
280                tx_num += 1;
281            }
282            receipts.push((block.number, block_receipts));
283        }
284        db.insert_receipts_by_block(receipts, StorageKind::Static)?;
285
286        // simulate pipeline by setting all checkpoints to inserted height.
287        let provider_rw = db.factory.provider_rw()?;
288        for stage in StageId::ALL {
289            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
290        }
291        provider_rw.commit()?;
292
293        Ok(db)
294    }
295
296    /// Simulates losing data to corruption and compare the check consistency result
297    /// against the expected one.
298    fn simulate_behind_checkpoint_corruption(
299        db: &TestStageDB,
300        prune_count: usize,
301        segment: StaticFileSegment,
302        expected: Option<PipelineTarget>,
303    ) {
304        // We recreate the static file provider, since consistency heals are done on fetching the
305        // writer for the first time.
306        let mut static_file_provider = db.factory.static_file_provider();
307        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
308
309        // Simulate corruption by removing `prune_count` rows from the data file without updating
310        // its offset list and configuration.
311        {
312            let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
313            let reader = headers_writer.inner().jar().open_data_reader().unwrap();
314            let columns = headers_writer.inner().jar().columns();
315            let data_file = headers_writer.inner().data_file();
316            let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
317            data_file.get_mut().set_len(last_offset).unwrap();
318            data_file.flush().unwrap();
319            data_file.get_ref().sync_all().unwrap();
320        }
321
322        // We recreate the static file provider, since consistency heals are done on fetching the
323        // writer for the first time.
324        let mut static_file_provider = db.factory.static_file_provider();
325        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
326        assert!(matches!(
327            static_file_provider
328                .check_consistency(&db.factory.database_provider_ro().unwrap()),
329            Ok(e) if e == expected
330        ));
331    }
332
333    /// Saves a checkpoint with `checkpoint_block_number` and compare the check consistency result
334    /// against the expected one.
335    fn save_checkpoint_and_check(
336        db: &TestStageDB,
337        stage_id: StageId,
338        checkpoint_block_number: BlockNumber,
339        expected: Option<PipelineTarget>,
340    ) {
341        let provider_rw = db.factory.provider_rw().unwrap();
342        provider_rw
343            .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
344            .unwrap();
345        provider_rw.commit().unwrap();
346
347        assert!(matches!(
348            db.factory
349                .static_file_provider()
350                .check_consistency(&db.factory.database_provider_ro().unwrap(),),
351            Ok(e) if e == expected
352        ));
353    }
354
355    /// Inserts a dummy value at key and compare the check consistency result against the expected
356    /// one.
357    fn update_db_and_check<T: Table<Key = u64>>(
358        db: &TestStageDB,
359        key: u64,
360        expected: Option<PipelineTarget>,
361    ) where
362        <T as Table>::Value: Default,
363    {
364        update_db_with_and_check::<T>(db, key, expected, &Default::default());
365    }
366
367    /// Inserts the given value at key and compare the check consistency result against the expected
368    /// one.
369    fn update_db_with_and_check<T: Table<Key = u64>>(
370        db: &TestStageDB,
371        key: u64,
372        expected: Option<PipelineTarget>,
373        value: &T::Value,
374    ) {
375        let provider_rw = db.factory.provider_rw().unwrap();
376        let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
377        cursor.insert(key, value).unwrap();
378        provider_rw.commit().unwrap();
379
380        assert!(matches!(
381            db.factory
382                .static_file_provider()
383                .check_consistency(&db.factory.database_provider_ro().unwrap()),
384            Ok(e) if e == expected
385        ));
386    }
387
388    #[test]
389    fn test_consistency() {
390        let db = seed_data(90).unwrap();
391        let db_provider = db.factory.database_provider_ro().unwrap();
392
393        assert!(matches!(
394            db.factory.static_file_provider().check_consistency(&db_provider),
395            Ok(None)
396        ));
397    }
398
399    #[test]
400    fn test_consistency_no_commit_prune() {
401        // Test full node with receipt pruning
402        let mut db_full = seed_data(90).unwrap();
403        db_full.factory = db_full.factory.with_prune_modes(PruneModes {
404            receipts: Some(PruneMode::Before(1)),
405            ..Default::default()
406        });
407
408        // Full node does not use receipts, therefore doesn't check for consistency on receipts
409        // segment
410        simulate_behind_checkpoint_corruption(&db_full, 1, StaticFileSegment::Receipts, None);
411
412        // Test archive node without receipt pruning
413        let db_archive = seed_data(90).unwrap();
414
415        // there are 2 to 3 transactions per block. however, if we lose one tx, we need to unwind to
416        // the previous block.
417        simulate_behind_checkpoint_corruption(
418            &db_archive,
419            1,
420            StaticFileSegment::Receipts,
421            Some(PipelineTarget::Unwind(88)),
422        );
423
424        simulate_behind_checkpoint_corruption(
425            &db_archive,
426            3,
427            StaticFileSegment::Headers,
428            Some(PipelineTarget::Unwind(86)),
429        );
430    }
431
432    #[test]
433    fn test_consistency_checkpoints() {
434        let db = seed_data(90).unwrap();
435
436        // When a checkpoint is behind, we delete data from static files.
437        let block = 87;
438        save_checkpoint_and_check(&db, StageId::Bodies, block, None);
439        assert_eq!(
440            db.factory
441                .static_file_provider()
442                .get_highest_static_file_block(StaticFileSegment::Transactions),
443            Some(block)
444        );
445        assert_eq!(
446            db.factory
447                .static_file_provider()
448                .get_highest_static_file_tx(StaticFileSegment::Transactions),
449            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
450        );
451
452        let block = 86;
453        save_checkpoint_and_check(&db, StageId::Execution, block, None);
454        assert_eq!(
455            db.factory
456                .static_file_provider()
457                .get_highest_static_file_block(StaticFileSegment::Receipts),
458            Some(block)
459        );
460        assert_eq!(
461            db.factory
462                .static_file_provider()
463                .get_highest_static_file_tx(StaticFileSegment::Receipts),
464            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
465        );
466
467        let block = 80;
468        save_checkpoint_and_check(&db, StageId::Headers, block, None);
469        assert_eq!(
470            db.factory
471                .static_file_provider()
472                .get_highest_static_file_block(StaticFileSegment::Headers),
473            Some(block)
474        );
475
476        // When a checkpoint is ahead, we request a pipeline unwind.
477        save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
478    }
479
480    #[test]
481    fn test_consistency_headers_gap() {
482        let db = seed_data(90).unwrap();
483        let current = db
484            .factory
485            .static_file_provider()
486            .get_highest_static_file_block(StaticFileSegment::Headers)
487            .unwrap();
488
489        // Creates a gap of one header: static_file <missing> db
490        update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
491
492        // Fill the gap, and ensure no unwind is necessary.
493        update_db_and_check::<tables::Headers>(&db, current + 1, None);
494    }
495
496    #[test]
497    fn test_consistency_tx_gap() {
498        let db = seed_data(90).unwrap();
499        let current = db
500            .factory
501            .static_file_provider()
502            .get_highest_static_file_tx(StaticFileSegment::Transactions)
503            .unwrap();
504
505        // Creates a gap of one transaction: static_file <missing> db
506        update_db_with_and_check::<tables::Transactions>(
507            &db,
508            current + 2,
509            Some(PipelineTarget::Unwind(89)),
510            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
511        );
512
513        // Fill the gap, and ensure no unwind is necessary.
514        update_db_with_and_check::<tables::Transactions>(
515            &db,
516            current + 1,
517            None,
518            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
519        );
520    }
521
522    #[test]
523    fn test_consistency_receipt_gap() {
524        let db = seed_data(90).unwrap();
525        let current = db
526            .factory
527            .static_file_provider()
528            .get_highest_static_file_tx(StaticFileSegment::Receipts)
529            .unwrap();
530
531        // Creates a gap of one receipt: static_file <missing> db
532        update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
533
534        // Fill the gap, and ensure no unwind is necessary.
535        update_db_and_check::<tables::Receipts>(&db, current + 1, None);
536    }
537}