reth_stages/stages/
mod.rs

1/// The bodies stage.
2mod bodies;
3/// The execution stage that generates state diff.
4mod execution;
5/// The finish stage
6mod finish;
7/// Account hashing stage.
8mod hashing_account;
9/// Storage hashing stage.
10mod hashing_storage;
11/// The headers stage.
12mod headers;
13/// Index history of account changes
14mod index_account_history;
15/// Index history of storage changes
16mod index_storage_history;
17/// Stage for computing state root.
18mod merkle;
19mod prune;
20/// The s3 download stage
21mod s3;
22/// The sender recovery stage.
23mod sender_recovery;
24/// The transaction lookup stage
25mod tx_lookup;
26
27pub use bodies::*;
28pub use execution::*;
29pub use finish::*;
30pub use hashing_account::*;
31pub use hashing_storage::*;
32pub use headers::*;
33pub use index_account_history::*;
34pub use index_storage_history::*;
35pub use merkle::*;
36pub use prune::*;
37pub use s3::*;
38pub use sender_recovery::*;
39pub use tx_lookup::*;
40
41mod utils;
42use utils::*;
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47    use crate::test_utils::{StorageKind, TestStageDB};
48    use alloy_consensus::{SignableTransaction, TxLegacy};
49    use alloy_primitives::{
50        address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
51    };
52    use alloy_rlp::Decodable;
53    use reth_chainspec::ChainSpecBuilder;
54    use reth_db::mdbx::{cursor::Cursor, RW};
55    use reth_db_api::{
56        cursor::{DbCursorRO, DbCursorRW},
57        table::Table,
58        tables,
59        transaction::{DbTx, DbTxMut},
60        AccountsHistory,
61    };
62    use reth_ethereum_consensus::EthBeaconConsensus;
63    use reth_ethereum_primitives::Block;
64    use reth_evm_ethereum::execute::EthExecutorProvider;
65    use reth_exex::ExExManagerHandle;
66    use reth_primitives_traits::{Account, Bytecode, SealedBlock};
67    use reth_provider::{
68        providers::{StaticFileProvider, StaticFileWriter},
69        test_utils::MockNodeTypesWithDB,
70        AccountExtReader, BlockBodyIndicesProvider, DatabaseProviderFactory, ProviderFactory,
71        ProviderResult, ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory,
72        StorageReader,
73    };
74    use reth_prune_types::{PruneMode, PruneModes};
75    use reth_stages_api::{
76        ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
77    };
78    use reth_static_file_types::StaticFileSegment;
79    use reth_testing_utils::generators::{
80        self, random_block, random_block_range, random_receipt, BlockRangeParams,
81    };
82    use std::{io::Write, sync::Arc};
83
84    #[tokio::test]
85    #[ignore]
86    async fn test_prune() {
87        let test_db = TestStageDB::default();
88
89        let provider_rw = test_db.factory.provider_rw().unwrap();
90        let tip = 66;
91        let input = ExecInput { target: Some(tip), checkpoint: None };
92        let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
93        let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
94        let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
95        let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
96        provider_rw.insert_historical_block(genesis.try_recover().unwrap()).unwrap();
97        provider_rw.insert_historical_block(block.clone().try_recover().unwrap()).unwrap();
98
99        // Fill with bogus blocks to respect PruneMode distance.
100        let mut head = block.hash();
101        let mut rng = generators::rng();
102        for block_number in 2..=tip {
103            let nblock = random_block(
104                &mut rng,
105                block_number,
106                generators::BlockParams { parent: Some(head), ..Default::default() },
107            );
108            head = nblock.hash();
109            provider_rw.insert_historical_block(nblock.try_recover().unwrap()).unwrap();
110        }
111        provider_rw
112            .static_file_provider()
113            .latest_writer(StaticFileSegment::Headers)
114            .unwrap()
115            .commit()
116            .unwrap();
117        provider_rw.commit().unwrap();
118
119        // insert pre state
120        let provider_rw = test_db.factory.provider_rw().unwrap();
121        let code = hex!("5a465a905090036002900360015500");
122        let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
123        provider_rw
124            .tx_ref()
125            .put::<tables::PlainAccountState>(
126                address!("0x1000000000000000000000000000000000000000"),
127                Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
128            )
129            .unwrap();
130        provider_rw
131            .tx_ref()
132            .put::<tables::PlainAccountState>(
133                address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
134                Account {
135                    nonce: 0,
136                    balance: U256::from(0x3635c9adc5dea00000u128),
137                    bytecode_hash: None,
138                },
139            )
140            .unwrap();
141        provider_rw
142            .tx_ref()
143            .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
144            .unwrap();
145        provider_rw.commit().unwrap();
146
147        let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
148                             prune_modes: PruneModes,
149                             expect_num_receipts: usize,
150                             expect_num_acc_changesets: usize,
151                             expect_num_storage_changesets: usize| async move {
152            let provider = factory.database_provider_rw().unwrap();
153
154            // Check execution and create receipts and changesets according to the pruning
155            // configuration
156            let mut execution_stage = ExecutionStage::new(
157                EthExecutorProvider::ethereum(Arc::new(
158                    ChainSpecBuilder::mainnet().berlin_activated().build(),
159                )),
160                Arc::new(EthBeaconConsensus::new(Arc::new(
161                    ChainSpecBuilder::mainnet().berlin_activated().build(),
162                ))),
163                ExecutionStageThresholds {
164                    max_blocks: Some(100),
165                    max_changes: None,
166                    max_cumulative_gas: None,
167                    max_duration: None,
168                },
169                MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
170                ExExManagerHandle::empty(),
171            );
172
173            execution_stage.execute(&provider, input).unwrap();
174            assert_eq!(
175                provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
176                expect_num_receipts
177            );
178
179            assert_eq!(
180                provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
181                expect_num_storage_changesets
182            );
183
184            assert_eq!(
185                provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
186                expect_num_acc_changesets
187            );
188
189            // Check AccountHistory
190            let mut acc_indexing_stage = IndexAccountHistoryStage {
191                prune_mode: prune_modes.account_history,
192                ..Default::default()
193            };
194
195            if prune_modes.account_history == Some(PruneMode::Full) {
196                // Full is not supported
197                assert!(acc_indexing_stage.execute(&provider, input).is_err());
198            } else {
199                acc_indexing_stage.execute(&provider, input).unwrap();
200                let mut account_history: Cursor<RW, AccountsHistory> =
201                    provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
202                assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
203            }
204
205            // Check StorageHistory
206            let mut storage_indexing_stage = IndexStorageHistoryStage {
207                prune_mode: prune_modes.storage_history,
208                ..Default::default()
209            };
210
211            if prune_modes.storage_history == Some(PruneMode::Full) {
212                // Full is not supported
213                assert!(acc_indexing_stage.execute(&provider, input).is_err());
214            } else {
215                storage_indexing_stage.execute(&provider, input).unwrap();
216
217                let mut storage_history =
218                    provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
219                assert_eq!(
220                    storage_history.walk(None).unwrap().count(),
221                    expect_num_storage_changesets
222                );
223            }
224        };
225
226        // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
227        // storage.
228        let mut prune = PruneModes::none();
229        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
230
231        prune.receipts = Some(PruneMode::Full);
232        prune.account_history = Some(PruneMode::Full);
233        prune.storage_history = Some(PruneMode::Full);
234        // This will result in error for account_history and storage_history, which is caught.
235        check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
236
237        prune.receipts = Some(PruneMode::Before(1));
238        prune.account_history = Some(PruneMode::Before(1));
239        prune.storage_history = Some(PruneMode::Before(1));
240        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
241
242        prune.receipts = Some(PruneMode::Before(2));
243        prune.account_history = Some(PruneMode::Before(2));
244        prune.storage_history = Some(PruneMode::Before(2));
245        // The one account is the miner
246        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
247
248        prune.receipts = Some(PruneMode::Distance(66));
249        prune.account_history = Some(PruneMode::Distance(66));
250        prune.storage_history = Some(PruneMode::Distance(66));
251        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
252
253        prune.receipts = Some(PruneMode::Distance(64));
254        prune.account_history = Some(PruneMode::Distance(64));
255        prune.storage_history = Some(PruneMode::Distance(64));
256        // The one account is the miner
257        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
258    }
259
260    /// It will generate `num_blocks`, push them to static files and set all stage checkpoints to
261    /// `num_blocks - 1`.
262    fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
263        let db = TestStageDB::default();
264        let mut rng = generators::rng();
265        let genesis_hash = B256::ZERO;
266        let tip = (num_blocks - 1) as u64;
267
268        let blocks = random_block_range(
269            &mut rng,
270            0..=tip,
271            BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
272        );
273        db.insert_blocks(blocks.iter(), StorageKind::Static)?;
274
275        let mut receipts = Vec::with_capacity(blocks.len());
276        let mut tx_num = 0u64;
277        for block in &blocks {
278            let mut block_receipts = Vec::with_capacity(block.transaction_count());
279            for transaction in &block.body().transactions {
280                block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0))));
281                tx_num += 1;
282            }
283            receipts.push((block.number, block_receipts));
284        }
285        db.insert_receipts_by_block(receipts, StorageKind::Static)?;
286
287        // simulate pipeline by setting all checkpoints to inserted height.
288        let provider_rw = db.factory.provider_rw()?;
289        for stage in StageId::ALL {
290            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
291        }
292        provider_rw.commit()?;
293
294        Ok(db)
295    }
296
297    /// Simulates losing data to corruption and compare the check consistency result
298    /// against the expected one.
299    fn simulate_behind_checkpoint_corruption(
300        db: &TestStageDB,
301        prune_count: usize,
302        segment: StaticFileSegment,
303        is_full_node: bool,
304        expected: Option<PipelineTarget>,
305    ) {
306        // We recreate the static file provider, since consistency heals are done on fetching the
307        // writer for the first time.
308        let mut static_file_provider = db.factory.static_file_provider();
309        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
310
311        // Simulate corruption by removing `prune_count` rows from the data file without updating
312        // its offset list and configuration.
313        {
314            let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
315            let reader = headers_writer.inner().jar().open_data_reader().unwrap();
316            let columns = headers_writer.inner().jar().columns();
317            let data_file = headers_writer.inner().data_file();
318            let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
319            data_file.get_mut().set_len(last_offset).unwrap();
320            data_file.flush().unwrap();
321            data_file.get_ref().sync_all().unwrap();
322        }
323
324        // We recreate the static file provider, since consistency heals are done on fetching the
325        // writer for the first time.
326        let mut static_file_provider = db.factory.static_file_provider();
327        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
328        assert!(matches!(
329            static_file_provider
330                .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
331            Ok(e) if e == expected
332        ));
333    }
334
335    /// Saves a checkpoint with `checkpoint_block_number` and compare the check consistency result
336    /// against the expected one.
337    fn save_checkpoint_and_check(
338        db: &TestStageDB,
339        stage_id: StageId,
340        checkpoint_block_number: BlockNumber,
341        expected: Option<PipelineTarget>,
342    ) {
343        let provider_rw = db.factory.provider_rw().unwrap();
344        provider_rw
345            .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
346            .unwrap();
347        provider_rw.commit().unwrap();
348
349        assert!(matches!(
350            db.factory
351                .static_file_provider()
352                .check_consistency(&db.factory.database_provider_ro().unwrap(), false,),
353            Ok(e) if e == expected
354        ));
355    }
356
357    /// Inserts a dummy value at key and compare the check consistency result against the expected
358    /// one.
359    fn update_db_and_check<T: Table<Key = u64>>(
360        db: &TestStageDB,
361        key: u64,
362        expected: Option<PipelineTarget>,
363    ) where
364        <T as Table>::Value: Default,
365    {
366        update_db_with_and_check::<T>(db, key, expected, &Default::default());
367    }
368
369    /// Inserts the given value at key and compare the check consistency result against the expected
370    /// one.
371    fn update_db_with_and_check<T: Table<Key = u64>>(
372        db: &TestStageDB,
373        key: u64,
374        expected: Option<PipelineTarget>,
375        value: &T::Value,
376    ) {
377        let provider_rw = db.factory.provider_rw().unwrap();
378        let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
379        cursor.insert(key, value).unwrap();
380        provider_rw.commit().unwrap();
381
382        assert!(matches!(
383            db.factory
384                .static_file_provider()
385                .check_consistency(&db.factory.database_provider_ro().unwrap(), false),
386            Ok(e) if e == expected
387        ));
388    }
389
390    #[test]
391    fn test_consistency() {
392        let db = seed_data(90).unwrap();
393        let db_provider = db.factory.database_provider_ro().unwrap();
394
395        assert!(matches!(
396            db.factory.static_file_provider().check_consistency(&db_provider, false),
397            Ok(None)
398        ));
399    }
400
401    #[test]
402    fn test_consistency_no_commit_prune() {
403        let db = seed_data(90).unwrap();
404        let full_node = true;
405        let archive_node = !full_node;
406
407        // Full node does not use receipts, therefore doesn't check for consistency on receipts
408        // segment
409        simulate_behind_checkpoint_corruption(&db, 1, StaticFileSegment::Receipts, full_node, None);
410
411        // there are 2 to 3 transactions per block. however, if we lose one tx, we need to unwind to
412        // the previous block.
413        simulate_behind_checkpoint_corruption(
414            &db,
415            1,
416            StaticFileSegment::Receipts,
417            archive_node,
418            Some(PipelineTarget::Unwind(88)),
419        );
420
421        simulate_behind_checkpoint_corruption(
422            &db,
423            3,
424            StaticFileSegment::Headers,
425            archive_node,
426            Some(PipelineTarget::Unwind(86)),
427        );
428    }
429
430    #[test]
431    fn test_consistency_checkpoints() {
432        let db = seed_data(90).unwrap();
433
434        // When a checkpoint is behind, we delete data from static files.
435        let block = 87;
436        save_checkpoint_and_check(&db, StageId::Bodies, block, None);
437        assert_eq!(
438            db.factory
439                .static_file_provider()
440                .get_highest_static_file_block(StaticFileSegment::Transactions),
441            Some(block)
442        );
443        assert_eq!(
444            db.factory
445                .static_file_provider()
446                .get_highest_static_file_tx(StaticFileSegment::Transactions),
447            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
448        );
449
450        let block = 86;
451        save_checkpoint_and_check(&db, StageId::Execution, block, None);
452        assert_eq!(
453            db.factory
454                .static_file_provider()
455                .get_highest_static_file_block(StaticFileSegment::Receipts),
456            Some(block)
457        );
458        assert_eq!(
459            db.factory
460                .static_file_provider()
461                .get_highest_static_file_tx(StaticFileSegment::Receipts),
462            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
463        );
464
465        let block = 80;
466        save_checkpoint_and_check(&db, StageId::Headers, block, None);
467        assert_eq!(
468            db.factory
469                .static_file_provider()
470                .get_highest_static_file_block(StaticFileSegment::Headers),
471            Some(block)
472        );
473
474        // When a checkpoint is ahead, we request a pipeline unwind.
475        save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
476    }
477
478    #[test]
479    fn test_consistency_headers_gap() {
480        let db = seed_data(90).unwrap();
481        let current = db
482            .factory
483            .static_file_provider()
484            .get_highest_static_file_block(StaticFileSegment::Headers)
485            .unwrap();
486
487        // Creates a gap of one header: static_file <missing> db
488        update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
489
490        // Fill the gap, and ensure no unwind is necessary.
491        update_db_and_check::<tables::Headers>(&db, current + 1, None);
492    }
493
494    #[test]
495    fn test_consistency_tx_gap() {
496        let db = seed_data(90).unwrap();
497        let current = db
498            .factory
499            .static_file_provider()
500            .get_highest_static_file_tx(StaticFileSegment::Transactions)
501            .unwrap();
502
503        // Creates a gap of one transaction: static_file <missing> db
504        update_db_with_and_check::<tables::Transactions>(
505            &db,
506            current + 2,
507            Some(PipelineTarget::Unwind(89)),
508            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
509        );
510
511        // Fill the gap, and ensure no unwind is necessary.
512        update_db_with_and_check::<tables::Transactions>(
513            &db,
514            current + 1,
515            None,
516            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
517        );
518    }
519
520    #[test]
521    fn test_consistency_receipt_gap() {
522        let db = seed_data(90).unwrap();
523        let current = db
524            .factory
525            .static_file_provider()
526            .get_highest_static_file_tx(StaticFileSegment::Receipts)
527            .unwrap();
528
529        // Creates a gap of one receipt: static_file <missing> db
530        update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
531
532        // Fill the gap, and ensure no unwind is necessary.
533        update_db_and_check::<tables::Receipts>(&db, current + 1, None);
534    }
535}