reth_stages/stages/
mod.rs

1/// The bodies stage.
2mod bodies;
3/// The execution stage that generates state diff.
4mod execution;
5/// The finish stage
6mod finish;
7/// Account hashing stage.
8mod hashing_account;
9/// Storage hashing stage.
10mod hashing_storage;
11/// The headers stage.
12mod headers;
13/// Index history of account changes
14mod index_account_history;
15/// Index history of storage changes
16mod index_storage_history;
17/// Stage for computing state root.
18mod merkle;
19/// Stage for computing merkle changesets.
20mod merkle_changesets;
21mod prune;
22/// The sender recovery stage.
23mod sender_recovery;
24/// The transaction lookup stage
25mod tx_lookup;
26
27pub use bodies::*;
28pub use era::*;
29pub use execution::*;
30pub use finish::*;
31pub use hashing_account::*;
32pub use hashing_storage::*;
33pub use headers::*;
34pub use index_account_history::*;
35pub use index_storage_history::*;
36pub use merkle::*;
37pub use merkle_changesets::*;
38pub use prune::*;
39pub use sender_recovery::*;
40pub use tx_lookup::*;
41
42mod era;
43mod utils;
44
45use utils::*;
46
47#[cfg(test)]
48mod tests {
49    use super::*;
50    use crate::test_utils::{StorageKind, TestStageDB};
51    use alloy_consensus::{SignableTransaction, TxLegacy};
52    use alloy_primitives::{
53        address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
54    };
55    use alloy_rlp::Decodable;
56    use reth_chainspec::ChainSpecBuilder;
57    use reth_db::mdbx::{cursor::Cursor, RW};
58    use reth_db_api::{
59        cursor::{DbCursorRO, DbCursorRW},
60        table::Table,
61        tables,
62        transaction::{DbTx, DbTxMut},
63        AccountsHistory,
64    };
65    use reth_ethereum_consensus::EthBeaconConsensus;
66    use reth_ethereum_primitives::Block;
67    use reth_evm_ethereum::EthEvmConfig;
68    use reth_exex::ExExManagerHandle;
69    use reth_primitives_traits::{Account, Bytecode, SealedBlock};
70    use reth_provider::{
71        providers::{StaticFileProvider, StaticFileWriter},
72        test_utils::MockNodeTypesWithDB,
73        AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory,
74        ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
75        StaticFileProviderFactory, StorageReader,
76    };
77    use reth_prune_types::{PruneMode, PruneModes};
78    use reth_stages_api::{
79        ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
80    };
81    use reth_static_file_types::StaticFileSegment;
82    use reth_testing_utils::generators::{
83        self, random_block, random_block_range, random_receipt, BlockRangeParams,
84    };
85    use std::{io::Write, sync::Arc};
86
87    #[tokio::test]
88    #[ignore]
89    async fn test_prune() {
90        let test_db = TestStageDB::default();
91
92        let provider_rw = test_db.factory.provider_rw().unwrap();
93        let tip = 66;
94        let input = ExecInput { target: Some(tip), checkpoint: None };
95        let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
96        let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
97        let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
98        let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
99        provider_rw.insert_block(genesis.try_recover().unwrap()).unwrap();
100        provider_rw.insert_block(block.clone().try_recover().unwrap()).unwrap();
101
102        // Fill with bogus blocks to respect PruneMode distance.
103        let mut head = block.hash();
104        let mut rng = generators::rng();
105        for block_number in 2..=tip {
106            let nblock = random_block(
107                &mut rng,
108                block_number,
109                generators::BlockParams { parent: Some(head), ..Default::default() },
110            );
111            head = nblock.hash();
112            provider_rw.insert_block(nblock.try_recover().unwrap()).unwrap();
113        }
114        provider_rw
115            .static_file_provider()
116            .latest_writer(StaticFileSegment::Headers)
117            .unwrap()
118            .commit()
119            .unwrap();
120        provider_rw.commit().unwrap();
121
122        // insert pre state
123        let provider_rw = test_db.factory.provider_rw().unwrap();
124        let code = hex!("5a465a905090036002900360015500");
125        let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
126        provider_rw
127            .tx_ref()
128            .put::<tables::PlainAccountState>(
129                address!("0x1000000000000000000000000000000000000000"),
130                Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
131            )
132            .unwrap();
133        provider_rw
134            .tx_ref()
135            .put::<tables::PlainAccountState>(
136                address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
137                Account {
138                    nonce: 0,
139                    balance: U256::from(0x3635c9adc5dea00000u128),
140                    bytecode_hash: None,
141                },
142            )
143            .unwrap();
144        provider_rw
145            .tx_ref()
146            .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
147            .unwrap();
148        provider_rw.commit().unwrap();
149
150        let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
151                             prune_modes: PruneModes,
152                             expect_num_receipts: usize,
153                             expect_num_acc_changesets: usize,
154                             expect_num_storage_changesets: usize| async move {
155            let provider = factory.database_provider_rw().unwrap();
156
157            // Check execution and create receipts and changesets according to the pruning
158            // configuration
159            let mut execution_stage = ExecutionStage::new(
160                EthEvmConfig::ethereum(Arc::new(
161                    ChainSpecBuilder::mainnet().berlin_activated().build(),
162                )),
163                Arc::new(EthBeaconConsensus::new(Arc::new(
164                    ChainSpecBuilder::mainnet().berlin_activated().build(),
165                ))),
166                ExecutionStageThresholds {
167                    max_blocks: Some(100),
168                    max_changes: None,
169                    max_cumulative_gas: None,
170                    max_duration: None,
171                },
172                MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
173                ExExManagerHandle::empty(),
174            );
175
176            execution_stage.execute(&provider, input).unwrap();
177            assert_eq!(
178                provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
179                expect_num_receipts
180            );
181
182            assert_eq!(
183                provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
184                expect_num_storage_changesets
185            );
186
187            assert_eq!(
188                provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
189                expect_num_acc_changesets
190            );
191
192            // Check AccountHistory
193            let mut acc_indexing_stage = IndexAccountHistoryStage {
194                prune_mode: prune_modes.account_history,
195                ..Default::default()
196            };
197
198            if prune_modes.account_history == Some(PruneMode::Full) {
199                // Full is not supported
200                assert!(acc_indexing_stage.execute(&provider, input).is_err());
201            } else {
202                acc_indexing_stage.execute(&provider, input).unwrap();
203                let mut account_history: Cursor<RW, AccountsHistory> =
204                    provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
205                assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
206            }
207
208            // Check StorageHistory
209            let mut storage_indexing_stage = IndexStorageHistoryStage {
210                prune_mode: prune_modes.storage_history,
211                ..Default::default()
212            };
213
214            if prune_modes.storage_history == Some(PruneMode::Full) {
215                // Full is not supported
216                assert!(storage_indexing_stage.execute(&provider, input).is_err());
217            } else {
218                storage_indexing_stage.execute(&provider, input).unwrap();
219
220                let mut storage_history =
221                    provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
222                assert_eq!(
223                    storage_history.walk(None).unwrap().count(),
224                    expect_num_storage_changesets
225                );
226            }
227        };
228
229        // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
230        // storage.
231        let mut prune = PruneModes::default();
232        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
233
234        prune.receipts = Some(PruneMode::Full);
235        prune.account_history = Some(PruneMode::Full);
236        prune.storage_history = Some(PruneMode::Full);
237        // This will result in error for account_history and storage_history, which is caught.
238        check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
239
240        prune.receipts = Some(PruneMode::Before(1));
241        prune.account_history = Some(PruneMode::Before(1));
242        prune.storage_history = Some(PruneMode::Before(1));
243        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
244
245        prune.receipts = Some(PruneMode::Before(2));
246        prune.account_history = Some(PruneMode::Before(2));
247        prune.storage_history = Some(PruneMode::Before(2));
248        // The one account is the miner
249        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
250
251        prune.receipts = Some(PruneMode::Distance(66));
252        prune.account_history = Some(PruneMode::Distance(66));
253        prune.storage_history = Some(PruneMode::Distance(66));
254        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
255
256        prune.receipts = Some(PruneMode::Distance(64));
257        prune.account_history = Some(PruneMode::Distance(64));
258        prune.storage_history = Some(PruneMode::Distance(64));
259        // The one account is the miner
260        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
261    }
262
263    /// It will generate `num_blocks`, push them to static files and set all stage checkpoints to
264    /// `num_blocks - 1`.
265    fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
266        let db = TestStageDB::default();
267        let mut rng = generators::rng();
268        let genesis_hash = B256::ZERO;
269        let tip = (num_blocks - 1) as u64;
270
271        let blocks = random_block_range(
272            &mut rng,
273            0..=tip,
274            BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
275        );
276        db.insert_blocks(blocks.iter(), StorageKind::Static)?;
277
278        let mut receipts = Vec::with_capacity(blocks.len());
279        let mut tx_num = 0u64;
280        for block in &blocks {
281            let mut block_receipts = Vec::with_capacity(block.transaction_count());
282            for transaction in &block.body().transactions {
283                block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0), None)));
284                tx_num += 1;
285            }
286            receipts.push((block.number, block_receipts));
287        }
288        db.insert_receipts_by_block(receipts, StorageKind::Static)?;
289
290        // simulate pipeline by setting all checkpoints to inserted height.
291        let provider_rw = db.factory.provider_rw()?;
292        for stage in StageId::ALL {
293            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
294        }
295        provider_rw.commit()?;
296
297        Ok(db)
298    }
299
300    /// Simulates losing data to corruption and compare the check consistency result
301    /// against the expected one.
302    fn simulate_behind_checkpoint_corruption(
303        db: &TestStageDB,
304        prune_count: usize,
305        segment: StaticFileSegment,
306        is_full_node: bool,
307        expected: Option<PipelineTarget>,
308    ) {
309        // We recreate the static file provider, since consistency heals are done on fetching the
310        // writer for the first time.
311        let mut static_file_provider = db.factory.static_file_provider();
312        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
313
314        // Simulate corruption by removing `prune_count` rows from the data file without updating
315        // its offset list and configuration.
316        {
317            let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
318            let reader = headers_writer.inner().jar().open_data_reader().unwrap();
319            let columns = headers_writer.inner().jar().columns();
320            let data_file = headers_writer.inner().data_file();
321            let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
322            data_file.get_mut().set_len(last_offset).unwrap();
323            data_file.flush().unwrap();
324            data_file.get_ref().sync_all().unwrap();
325        }
326
327        // We recreate the static file provider, since consistency heals are done on fetching the
328        // writer for the first time.
329        let mut static_file_provider = db.factory.static_file_provider();
330        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
331        assert!(matches!(
332            static_file_provider
333                .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
334            Ok(e) if e == expected
335        ));
336    }
337
338    /// Saves a checkpoint with `checkpoint_block_number` and compare the check consistency result
339    /// against the expected one.
340    fn save_checkpoint_and_check(
341        db: &TestStageDB,
342        stage_id: StageId,
343        checkpoint_block_number: BlockNumber,
344        expected: Option<PipelineTarget>,
345    ) {
346        let provider_rw = db.factory.provider_rw().unwrap();
347        provider_rw
348            .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
349            .unwrap();
350        provider_rw.commit().unwrap();
351
352        assert!(matches!(
353            db.factory
354                .static_file_provider()
355                .check_consistency(&db.factory.database_provider_ro().unwrap(), false,),
356            Ok(e) if e == expected
357        ));
358    }
359
360    /// Inserts a dummy value at key and compare the check consistency result against the expected
361    /// one.
362    fn update_db_and_check<T: Table<Key = u64>>(
363        db: &TestStageDB,
364        key: u64,
365        expected: Option<PipelineTarget>,
366    ) where
367        <T as Table>::Value: Default,
368    {
369        update_db_with_and_check::<T>(db, key, expected, &Default::default());
370    }
371
372    /// Inserts the given value at key and compare the check consistency result against the expected
373    /// one.
374    fn update_db_with_and_check<T: Table<Key = u64>>(
375        db: &TestStageDB,
376        key: u64,
377        expected: Option<PipelineTarget>,
378        value: &T::Value,
379    ) {
380        let provider_rw = db.factory.provider_rw().unwrap();
381        let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
382        cursor.insert(key, value).unwrap();
383        provider_rw.commit().unwrap();
384
385        assert!(matches!(
386            db.factory
387                .static_file_provider()
388                .check_consistency(&db.factory.database_provider_ro().unwrap(), false),
389            Ok(e) if e == expected
390        ));
391    }
392
393    #[test]
394    fn test_consistency() {
395        let db = seed_data(90).unwrap();
396        let db_provider = db.factory.database_provider_ro().unwrap();
397
398        assert!(matches!(
399            db.factory.static_file_provider().check_consistency(&db_provider, false),
400            Ok(None)
401        ));
402    }
403
404    #[test]
405    fn test_consistency_no_commit_prune() {
406        let db = seed_data(90).unwrap();
407        let full_node = true;
408        let archive_node = !full_node;
409
410        // Full node does not use receipts, therefore doesn't check for consistency on receipts
411        // segment
412        simulate_behind_checkpoint_corruption(&db, 1, StaticFileSegment::Receipts, full_node, None);
413
414        // there are 2 to 3 transactions per block. however, if we lose one tx, we need to unwind to
415        // the previous block.
416        simulate_behind_checkpoint_corruption(
417            &db,
418            1,
419            StaticFileSegment::Receipts,
420            archive_node,
421            Some(PipelineTarget::Unwind(88)),
422        );
423
424        simulate_behind_checkpoint_corruption(
425            &db,
426            3,
427            StaticFileSegment::Headers,
428            archive_node,
429            Some(PipelineTarget::Unwind(86)),
430        );
431    }
432
433    #[test]
434    fn test_consistency_checkpoints() {
435        let db = seed_data(90).unwrap();
436
437        // When a checkpoint is behind, we delete data from static files.
438        let block = 87;
439        save_checkpoint_and_check(&db, StageId::Bodies, block, None);
440        assert_eq!(
441            db.factory
442                .static_file_provider()
443                .get_highest_static_file_block(StaticFileSegment::Transactions),
444            Some(block)
445        );
446        assert_eq!(
447            db.factory
448                .static_file_provider()
449                .get_highest_static_file_tx(StaticFileSegment::Transactions),
450            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
451        );
452
453        let block = 86;
454        save_checkpoint_and_check(&db, StageId::Execution, block, None);
455        assert_eq!(
456            db.factory
457                .static_file_provider()
458                .get_highest_static_file_block(StaticFileSegment::Receipts),
459            Some(block)
460        );
461        assert_eq!(
462            db.factory
463                .static_file_provider()
464                .get_highest_static_file_tx(StaticFileSegment::Receipts),
465            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
466        );
467
468        let block = 80;
469        save_checkpoint_and_check(&db, StageId::Headers, block, None);
470        assert_eq!(
471            db.factory
472                .static_file_provider()
473                .get_highest_static_file_block(StaticFileSegment::Headers),
474            Some(block)
475        );
476
477        // When a checkpoint is ahead, we request a pipeline unwind.
478        save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
479    }
480
481    #[test]
482    fn test_consistency_headers_gap() {
483        let db = seed_data(90).unwrap();
484        let current = db
485            .factory
486            .static_file_provider()
487            .get_highest_static_file_block(StaticFileSegment::Headers)
488            .unwrap();
489
490        // Creates a gap of one header: static_file <missing> db
491        update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
492
493        // Fill the gap, and ensure no unwind is necessary.
494        update_db_and_check::<tables::Headers>(&db, current + 1, None);
495    }
496
497    #[test]
498    fn test_consistency_tx_gap() {
499        let db = seed_data(90).unwrap();
500        let current = db
501            .factory
502            .static_file_provider()
503            .get_highest_static_file_tx(StaticFileSegment::Transactions)
504            .unwrap();
505
506        // Creates a gap of one transaction: static_file <missing> db
507        update_db_with_and_check::<tables::Transactions>(
508            &db,
509            current + 2,
510            Some(PipelineTarget::Unwind(89)),
511            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
512        );
513
514        // Fill the gap, and ensure no unwind is necessary.
515        update_db_with_and_check::<tables::Transactions>(
516            &db,
517            current + 1,
518            None,
519            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
520        );
521    }
522
523    #[test]
524    fn test_consistency_receipt_gap() {
525        let db = seed_data(90).unwrap();
526        let current = db
527            .factory
528            .static_file_provider()
529            .get_highest_static_file_tx(StaticFileSegment::Receipts)
530            .unwrap();
531
532        // Creates a gap of one receipt: static_file <missing> db
533        update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
534
535        // Fill the gap, and ensure no unwind is necessary.
536        update_db_and_check::<tables::Receipts>(&db, current + 1, None);
537    }
538}