reth_stages/stages/
mod.rs

1/// The bodies stage.
2mod bodies;
3/// The execution stage that generates state diff.
4mod execution;
5/// The finish stage
6mod finish;
7/// Account hashing stage.
8mod hashing_account;
9/// Storage hashing stage.
10mod hashing_storage;
11/// The headers stage.
12mod headers;
13/// Index history of account changes
14mod index_account_history;
15/// Index history of storage changes
16mod index_storage_history;
17/// Stage for computing state root.
18mod merkle;
19/// Stage for computing merkle changesets.
20mod merkle_changesets;
21mod prune;
22/// The sender recovery stage.
23mod sender_recovery;
24/// The transaction lookup stage
25mod tx_lookup;
26
27pub use bodies::*;
28pub use era::*;
29pub use execution::*;
30pub use finish::*;
31pub use hashing_account::*;
32pub use hashing_storage::*;
33pub use headers::*;
34pub use index_account_history::*;
35pub use index_storage_history::*;
36pub use merkle::*;
37pub use merkle_changesets::*;
38pub use prune::*;
39pub use sender_recovery::*;
40pub use tx_lookup::*;
41
42mod era;
43mod utils;
44
45use utils::*;
46
47#[cfg(test)]
48mod tests {
49    use super::*;
50    use crate::test_utils::{StorageKind, TestStageDB};
51    use alloy_consensus::{SignableTransaction, TxLegacy};
52    use alloy_primitives::{
53        address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
54    };
55    use alloy_rlp::Decodable;
56    use reth_chainspec::ChainSpecBuilder;
57    use reth_db::mdbx::{cursor::Cursor, RW};
58    use reth_db_api::{
59        cursor::{DbCursorRO, DbCursorRW},
60        table::Table,
61        tables,
62        transaction::{DbTx, DbTxMut},
63        AccountsHistory,
64    };
65    use reth_ethereum_consensus::EthBeaconConsensus;
66    use reth_ethereum_primitives::Block;
67    use reth_evm_ethereum::EthEvmConfig;
68    use reth_exex::ExExManagerHandle;
69    use reth_primitives_traits::{Account, Bytecode, SealedBlock};
70    use reth_provider::{
71        providers::{StaticFileProvider, StaticFileWriter},
72        test_utils::MockNodeTypesWithDB,
73        AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory,
74        ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
75        StaticFileProviderFactory, StorageReader,
76    };
77    use reth_prune_types::{PruneMode, PruneModes};
78    use reth_stages_api::{
79        ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
80    };
81    use reth_static_file_types::StaticFileSegment;
82    use reth_testing_utils::generators::{
83        self, random_block, random_block_range, random_receipt, BlockRangeParams,
84    };
85    use std::{io::Write, sync::Arc};
86
87    #[tokio::test]
88    #[ignore]
89    async fn test_prune() {
90        let test_db = TestStageDB::default();
91
92        let provider_rw = test_db.factory.provider_rw().unwrap();
93        let tip = 66;
94        let input = ExecInput { target: Some(tip), checkpoint: None };
95        let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
96        let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
97        let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
98        let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
99        provider_rw.insert_block(genesis.try_recover().unwrap()).unwrap();
100        provider_rw.insert_block(block.clone().try_recover().unwrap()).unwrap();
101
102        // Fill with bogus blocks to respect PruneMode distance.
103        let mut head = block.hash();
104        let mut rng = generators::rng();
105        for block_number in 2..=tip {
106            let nblock = random_block(
107                &mut rng,
108                block_number,
109                generators::BlockParams { parent: Some(head), ..Default::default() },
110            );
111            head = nblock.hash();
112            provider_rw.insert_block(nblock.try_recover().unwrap()).unwrap();
113        }
114        provider_rw
115            .static_file_provider()
116            .latest_writer(StaticFileSegment::Headers)
117            .unwrap()
118            .commit()
119            .unwrap();
120        provider_rw.commit().unwrap();
121
122        // insert pre state
123        let provider_rw = test_db.factory.provider_rw().unwrap();
124        let code = hex!("5a465a905090036002900360015500");
125        let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
126        provider_rw
127            .tx_ref()
128            .put::<tables::PlainAccountState>(
129                address!("0x1000000000000000000000000000000000000000"),
130                Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
131            )
132            .unwrap();
133        provider_rw
134            .tx_ref()
135            .put::<tables::PlainAccountState>(
136                address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
137                Account {
138                    nonce: 0,
139                    balance: U256::from(0x3635c9adc5dea00000u128),
140                    bytecode_hash: None,
141                },
142            )
143            .unwrap();
144        provider_rw
145            .tx_ref()
146            .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
147            .unwrap();
148        provider_rw.commit().unwrap();
149
150        let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
151                             prune_modes: PruneModes,
152                             expect_num_receipts: usize,
153                             expect_num_acc_changesets: usize,
154                             expect_num_storage_changesets: usize| async move {
155            let provider = factory.database_provider_rw().unwrap();
156
157            // Check execution and create receipts and changesets according to the pruning
158            // configuration
159            let mut execution_stage = ExecutionStage::new(
160                EthEvmConfig::ethereum(Arc::new(
161                    ChainSpecBuilder::mainnet().berlin_activated().build(),
162                )),
163                Arc::new(EthBeaconConsensus::new(Arc::new(
164                    ChainSpecBuilder::mainnet().berlin_activated().build(),
165                ))),
166                ExecutionStageThresholds {
167                    max_blocks: Some(100),
168                    max_changes: None,
169                    max_cumulative_gas: None,
170                    max_duration: None,
171                },
172                MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
173                ExExManagerHandle::empty(),
174            );
175
176            execution_stage.execute(&provider, input).unwrap();
177            assert_eq!(
178                provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
179                expect_num_receipts
180            );
181
182            assert_eq!(
183                provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
184                expect_num_storage_changesets
185            );
186
187            assert_eq!(
188                provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
189                expect_num_acc_changesets
190            );
191
192            // Check AccountHistory
193            let mut acc_indexing_stage = IndexAccountHistoryStage {
194                prune_mode: prune_modes.account_history,
195                ..Default::default()
196            };
197
198            if prune_modes.account_history == Some(PruneMode::Full) {
199                // Full is not supported
200                assert!(acc_indexing_stage.execute(&provider, input).is_err());
201            } else {
202                acc_indexing_stage.execute(&provider, input).unwrap();
203                let mut account_history: Cursor<RW, AccountsHistory> =
204                    provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
205                assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
206            }
207
208            // Check StorageHistory
209            let mut storage_indexing_stage = IndexStorageHistoryStage {
210                prune_mode: prune_modes.storage_history,
211                ..Default::default()
212            };
213
214            if prune_modes.storage_history == Some(PruneMode::Full) {
215                // Full is not supported
216                assert!(storage_indexing_stage.execute(&provider, input).is_err());
217            } else {
218                storage_indexing_stage.execute(&provider, input).unwrap();
219
220                let mut storage_history =
221                    provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
222                assert_eq!(
223                    storage_history.walk(None).unwrap().count(),
224                    expect_num_storage_changesets
225                );
226            }
227        };
228
229        // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
230        // storage.
231        let mut prune = PruneModes::default();
232        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
233
234        prune.receipts = Some(PruneMode::Full);
235        prune.account_history = Some(PruneMode::Full);
236        prune.storage_history = Some(PruneMode::Full);
237        // This will result in error for account_history and storage_history, which is caught.
238        check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
239
240        prune.receipts = Some(PruneMode::Before(1));
241        prune.account_history = Some(PruneMode::Before(1));
242        prune.storage_history = Some(PruneMode::Before(1));
243        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
244
245        prune.receipts = Some(PruneMode::Before(2));
246        prune.account_history = Some(PruneMode::Before(2));
247        prune.storage_history = Some(PruneMode::Before(2));
248        // The one account is the miner
249        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
250
251        prune.receipts = Some(PruneMode::Distance(66));
252        prune.account_history = Some(PruneMode::Distance(66));
253        prune.storage_history = Some(PruneMode::Distance(66));
254        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
255
256        prune.receipts = Some(PruneMode::Distance(64));
257        prune.account_history = Some(PruneMode::Distance(64));
258        prune.storage_history = Some(PruneMode::Distance(64));
259        // The one account is the miner
260        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
261    }
262
263    /// It will generate `num_blocks`, push them to static files and set all stage checkpoints to
264    /// `num_blocks - 1`.
265    fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
266        let db = TestStageDB::default();
267        let mut rng = generators::rng();
268        let genesis_hash = B256::ZERO;
269        let tip = (num_blocks - 1) as u64;
270
271        let blocks = random_block_range(
272            &mut rng,
273            0..=tip,
274            BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
275        );
276        db.insert_blocks(blocks.iter(), StorageKind::Static)?;
277
278        let mut receipts = Vec::with_capacity(blocks.len());
279        let mut tx_num = 0u64;
280        for block in &blocks {
281            let mut block_receipts = Vec::with_capacity(block.transaction_count());
282            for transaction in &block.body().transactions {
283                block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0), None)));
284                tx_num += 1;
285            }
286            receipts.push((block.number, block_receipts));
287        }
288        db.insert_receipts_by_block(receipts, StorageKind::Static)?;
289
290        // simulate pipeline by setting all checkpoints to inserted height.
291        let provider_rw = db.factory.provider_rw()?;
292        for stage in StageId::ALL {
293            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
294        }
295        provider_rw.commit()?;
296
297        Ok(db)
298    }
299
300    /// Simulates losing data to corruption and compare the check consistency result
301    /// against the expected one.
302    fn simulate_behind_checkpoint_corruption(
303        db: &TestStageDB,
304        prune_count: usize,
305        segment: StaticFileSegment,
306        expected: Option<PipelineTarget>,
307    ) {
308        // We recreate the static file provider, since consistency heals are done on fetching the
309        // writer for the first time.
310        let mut static_file_provider = db.factory.static_file_provider();
311        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
312
313        // Simulate corruption by removing `prune_count` rows from the data file without updating
314        // its offset list and configuration.
315        {
316            let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
317            let reader = headers_writer.inner().jar().open_data_reader().unwrap();
318            let columns = headers_writer.inner().jar().columns();
319            let data_file = headers_writer.inner().data_file();
320            let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
321            data_file.get_mut().set_len(last_offset).unwrap();
322            data_file.flush().unwrap();
323            data_file.get_ref().sync_all().unwrap();
324        }
325
326        // We recreate the static file provider, since consistency heals are done on fetching the
327        // writer for the first time.
328        let mut static_file_provider = db.factory.static_file_provider();
329        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
330        assert!(matches!(
331            static_file_provider
332                .check_consistency(&db.factory.database_provider_ro().unwrap()),
333            Ok(e) if e == expected
334        ));
335    }
336
337    /// Saves a checkpoint with `checkpoint_block_number` and compare the check consistency result
338    /// against the expected one.
339    fn save_checkpoint_and_check(
340        db: &TestStageDB,
341        stage_id: StageId,
342        checkpoint_block_number: BlockNumber,
343        expected: Option<PipelineTarget>,
344    ) {
345        let provider_rw = db.factory.provider_rw().unwrap();
346        provider_rw
347            .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
348            .unwrap();
349        provider_rw.commit().unwrap();
350
351        assert!(matches!(
352            db.factory
353                .static_file_provider()
354                .check_consistency(&db.factory.database_provider_ro().unwrap(),),
355            Ok(e) if e == expected
356        ));
357    }
358
359    /// Inserts a dummy value at key and compare the check consistency result against the expected
360    /// one.
361    fn update_db_and_check<T: Table<Key = u64>>(
362        db: &TestStageDB,
363        key: u64,
364        expected: Option<PipelineTarget>,
365    ) where
366        <T as Table>::Value: Default,
367    {
368        update_db_with_and_check::<T>(db, key, expected, &Default::default());
369    }
370
371    /// Inserts the given value at key and compare the check consistency result against the expected
372    /// one.
373    fn update_db_with_and_check<T: Table<Key = u64>>(
374        db: &TestStageDB,
375        key: u64,
376        expected: Option<PipelineTarget>,
377        value: &T::Value,
378    ) {
379        let provider_rw = db.factory.provider_rw().unwrap();
380        let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
381        cursor.insert(key, value).unwrap();
382        provider_rw.commit().unwrap();
383
384        assert!(matches!(
385            db.factory
386                .static_file_provider()
387                .check_consistency(&db.factory.database_provider_ro().unwrap()),
388            Ok(e) if e == expected
389        ));
390    }
391
392    #[test]
393    fn test_consistency() {
394        let db = seed_data(90).unwrap();
395        let db_provider = db.factory.database_provider_ro().unwrap();
396
397        assert!(matches!(
398            db.factory.static_file_provider().check_consistency(&db_provider),
399            Ok(None)
400        ));
401    }
402
403    #[test]
404    fn test_consistency_no_commit_prune() {
405        // Test full node with receipt pruning
406        let mut db_full = seed_data(90).unwrap();
407        db_full.factory = db_full.factory.with_prune_modes(PruneModes {
408            receipts: Some(PruneMode::Before(1)),
409            ..Default::default()
410        });
411
412        // Full node does not use receipts, therefore doesn't check for consistency on receipts
413        // segment
414        simulate_behind_checkpoint_corruption(&db_full, 1, StaticFileSegment::Receipts, None);
415
416        // Test archive node without receipt pruning
417        let db_archive = seed_data(90).unwrap();
418
419        // there are 2 to 3 transactions per block. however, if we lose one tx, we need to unwind to
420        // the previous block.
421        simulate_behind_checkpoint_corruption(
422            &db_archive,
423            1,
424            StaticFileSegment::Receipts,
425            Some(PipelineTarget::Unwind(88)),
426        );
427
428        simulate_behind_checkpoint_corruption(
429            &db_archive,
430            3,
431            StaticFileSegment::Headers,
432            Some(PipelineTarget::Unwind(86)),
433        );
434    }
435
436    #[test]
437    fn test_consistency_checkpoints() {
438        let db = seed_data(90).unwrap();
439
440        // When a checkpoint is behind, we delete data from static files.
441        let block = 87;
442        save_checkpoint_and_check(&db, StageId::Bodies, block, None);
443        assert_eq!(
444            db.factory
445                .static_file_provider()
446                .get_highest_static_file_block(StaticFileSegment::Transactions),
447            Some(block)
448        );
449        assert_eq!(
450            db.factory
451                .static_file_provider()
452                .get_highest_static_file_tx(StaticFileSegment::Transactions),
453            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
454        );
455
456        let block = 86;
457        save_checkpoint_and_check(&db, StageId::Execution, block, None);
458        assert_eq!(
459            db.factory
460                .static_file_provider()
461                .get_highest_static_file_block(StaticFileSegment::Receipts),
462            Some(block)
463        );
464        assert_eq!(
465            db.factory
466                .static_file_provider()
467                .get_highest_static_file_tx(StaticFileSegment::Receipts),
468            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
469        );
470
471        let block = 80;
472        save_checkpoint_and_check(&db, StageId::Headers, block, None);
473        assert_eq!(
474            db.factory
475                .static_file_provider()
476                .get_highest_static_file_block(StaticFileSegment::Headers),
477            Some(block)
478        );
479
480        // When a checkpoint is ahead, we request a pipeline unwind.
481        save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
482    }
483
484    #[test]
485    fn test_consistency_headers_gap() {
486        let db = seed_data(90).unwrap();
487        let current = db
488            .factory
489            .static_file_provider()
490            .get_highest_static_file_block(StaticFileSegment::Headers)
491            .unwrap();
492
493        // Creates a gap of one header: static_file <missing> db
494        update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
495
496        // Fill the gap, and ensure no unwind is necessary.
497        update_db_and_check::<tables::Headers>(&db, current + 1, None);
498    }
499
500    #[test]
501    fn test_consistency_tx_gap() {
502        let db = seed_data(90).unwrap();
503        let current = db
504            .factory
505            .static_file_provider()
506            .get_highest_static_file_tx(StaticFileSegment::Transactions)
507            .unwrap();
508
509        // Creates a gap of one transaction: static_file <missing> db
510        update_db_with_and_check::<tables::Transactions>(
511            &db,
512            current + 2,
513            Some(PipelineTarget::Unwind(89)),
514            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
515        );
516
517        // Fill the gap, and ensure no unwind is necessary.
518        update_db_with_and_check::<tables::Transactions>(
519            &db,
520            current + 1,
521            None,
522            &TxLegacy::default().into_signed(Signature::test_signature()).into(),
523        );
524    }
525
526    #[test]
527    fn test_consistency_receipt_gap() {
528        let db = seed_data(90).unwrap();
529        let current = db
530            .factory
531            .static_file_provider()
532            .get_highest_static_file_tx(StaticFileSegment::Receipts)
533            .unwrap();
534
535        // Creates a gap of one receipt: static_file <missing> db
536        update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
537
538        // Fill the gap, and ensure no unwind is necessary.
539        update_db_and_check::<tables::Receipts>(&db, current + 1, None);
540    }
541}