1mod bodies;
3mod era;
4mod execution;
6mod finish;
8mod hashing_account;
10mod hashing_storage;
12mod headers;
14mod index_account_history;
16mod index_storage_history;
18mod merkle;
20mod prune;
21mod sender_recovery;
23mod tx_lookup;
25
26pub use bodies::*;
27pub use era::*;
28pub use execution::*;
29pub use finish::*;
30pub use hashing_account::*;
31pub use hashing_storage::*;
32pub use headers::*;
33pub use index_account_history::*;
34pub use index_storage_history::*;
35pub use merkle::*;
36pub use prune::*;
37pub use sender_recovery::*;
38pub use tx_lookup::*;
39
40mod utils;
41use utils::*;
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use crate::test_utils::{StorageKind, TestStageDB};
47 use alloy_consensus::{SignableTransaction, TxLegacy};
48 use alloy_primitives::{
49 address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
50 };
51 use alloy_rlp::Decodable;
52 use reth_chainspec::ChainSpecBuilder;
53 use reth_db::mdbx::{cursor::Cursor, RW};
54 use reth_db_api::{
55 cursor::{DbCursorRO, DbCursorRW},
56 table::Table,
57 tables,
58 transaction::{DbTx, DbTxMut},
59 AccountsHistory,
60 };
61 use reth_ethereum_consensus::EthBeaconConsensus;
62 use reth_ethereum_primitives::Block;
63 use reth_evm_ethereum::EthEvmConfig;
64 use reth_exex::ExExManagerHandle;
65 use reth_primitives_traits::{Account, Bytecode, SealedBlock};
66 use reth_provider::{
67 providers::{StaticFileProvider, StaticFileWriter},
68 test_utils::MockNodeTypesWithDB,
69 AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory,
70 ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
71 StaticFileProviderFactory, StorageReader,
72 };
73 use reth_prune_types::{PruneMode, PruneModes};
74 use reth_stages_api::{
75 ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
76 };
77 use reth_static_file_types::StaticFileSegment;
78 use reth_testing_utils::generators::{
79 self, random_block, random_block_range, random_receipt, BlockRangeParams,
80 };
81 use std::{io::Write, sync::Arc};
82
83 #[tokio::test]
84 #[ignore]
85 async fn test_prune() {
86 let test_db = TestStageDB::default();
87
88 let provider_rw = test_db.factory.provider_rw().unwrap();
89 let tip = 66;
90 let input = ExecInput { target: Some(tip), checkpoint: None };
91 let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
92 let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
93 let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
94 let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
95 let mut head = block.hash();
96 provider_rw.insert_block(&genesis.try_recover().unwrap()).unwrap();
97 provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
98
99 let mut rng = generators::rng();
101 for block_number in 2..=tip {
102 let nblock = random_block(
103 &mut rng,
104 block_number,
105 generators::BlockParams { parent: Some(head), ..Default::default() },
106 );
107 head = nblock.hash();
108 provider_rw.insert_block(&nblock.try_recover().unwrap()).unwrap();
109 }
110 provider_rw
111 .static_file_provider()
112 .latest_writer(StaticFileSegment::Headers)
113 .unwrap()
114 .commit()
115 .unwrap();
116 provider_rw.commit().unwrap();
117
118 let provider_rw = test_db.factory.provider_rw().unwrap();
120 let code = hex!("5a465a905090036002900360015500");
121 let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
122 provider_rw
123 .tx_ref()
124 .put::<tables::PlainAccountState>(
125 address!("0x1000000000000000000000000000000000000000"),
126 Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
127 )
128 .unwrap();
129 provider_rw
130 .tx_ref()
131 .put::<tables::PlainAccountState>(
132 address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
133 Account {
134 nonce: 0,
135 balance: U256::from(0x3635c9adc5dea00000u128),
136 bytecode_hash: None,
137 },
138 )
139 .unwrap();
140 provider_rw
141 .tx_ref()
142 .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
143 .unwrap();
144 provider_rw.commit().unwrap();
145
146 let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
147 prune_modes: PruneModes,
148 expect_num_receipts: usize,
149 expect_num_acc_changesets: usize,
150 expect_num_storage_changesets: usize| async move {
151 let provider = factory.database_provider_rw().unwrap();
152
153 let mut execution_stage = ExecutionStage::new(
156 EthEvmConfig::ethereum(Arc::new(
157 ChainSpecBuilder::mainnet().berlin_activated().build(),
158 )),
159 Arc::new(EthBeaconConsensus::new(Arc::new(
160 ChainSpecBuilder::mainnet().berlin_activated().build(),
161 ))),
162 ExecutionStageThresholds {
163 max_blocks: Some(100),
164 max_changes: None,
165 max_cumulative_gas: None,
166 max_duration: None,
167 },
168 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
169 ExExManagerHandle::empty(),
170 );
171
172 execution_stage.execute(&provider, input).unwrap();
173 assert_eq!(
174 provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
175 expect_num_receipts
176 );
177
178 assert_eq!(
179 provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
180 expect_num_storage_changesets
181 );
182
183 assert_eq!(
184 provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
185 expect_num_acc_changesets
186 );
187
188 let mut acc_indexing_stage = IndexAccountHistoryStage {
190 prune_mode: prune_modes.account_history,
191 ..Default::default()
192 };
193
194 if prune_modes.account_history == Some(PruneMode::Full) {
195 assert!(acc_indexing_stage.execute(&provider, input).is_err());
197 } else {
198 acc_indexing_stage.execute(&provider, input).unwrap();
199 let mut account_history: Cursor<RW, AccountsHistory> =
200 provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
201 assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
202 }
203
204 let mut storage_indexing_stage = IndexStorageHistoryStage {
206 prune_mode: prune_modes.storage_history,
207 ..Default::default()
208 };
209
210 if prune_modes.storage_history == Some(PruneMode::Full) {
211 assert!(storage_indexing_stage.execute(&provider, input).is_err());
213 } else {
214 storage_indexing_stage.execute(&provider, input).unwrap();
215
216 let mut storage_history =
217 provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
218 assert_eq!(
219 storage_history.walk(None).unwrap().count(),
220 expect_num_storage_changesets
221 );
222 }
223 };
224
225 let mut prune = PruneModes::default();
228 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
229
230 prune.receipts = Some(PruneMode::Full);
231 prune.account_history = Some(PruneMode::Full);
232 prune.storage_history = Some(PruneMode::Full);
233 check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
235
236 prune.receipts = Some(PruneMode::Before(1));
237 prune.account_history = Some(PruneMode::Before(1));
238 prune.storage_history = Some(PruneMode::Before(1));
239 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
240
241 prune.receipts = Some(PruneMode::Before(2));
242 prune.account_history = Some(PruneMode::Before(2));
243 prune.storage_history = Some(PruneMode::Before(2));
244 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
246
247 prune.receipts = Some(PruneMode::Distance(66));
248 prune.account_history = Some(PruneMode::Distance(66));
249 prune.storage_history = Some(PruneMode::Distance(66));
250 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
251
252 prune.receipts = Some(PruneMode::Distance(64));
253 prune.account_history = Some(PruneMode::Distance(64));
254 prune.storage_history = Some(PruneMode::Distance(64));
255 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
257 }
258
259 fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
262 let db = TestStageDB::default();
263 let mut rng = generators::rng();
264 let genesis_hash = B256::ZERO;
265 let tip = (num_blocks - 1) as u64;
266
267 let blocks = random_block_range(
268 &mut rng,
269 0..=tip,
270 BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
271 );
272 db.insert_blocks(blocks.iter(), StorageKind::Static)?;
273
274 let mut receipts = Vec::with_capacity(blocks.len());
275 let mut tx_num = 0u64;
276 for block in &blocks {
277 let mut block_receipts = Vec::with_capacity(block.transaction_count());
278 for transaction in &block.body().transactions {
279 block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0), None)));
280 tx_num += 1;
281 }
282 receipts.push((block.number, block_receipts));
283 }
284 db.insert_receipts_by_block(receipts, StorageKind::Static)?;
285
286 let provider_rw = db.factory.provider_rw()?;
288 for stage in StageId::ALL {
289 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
290 }
291 provider_rw.commit()?;
292
293 Ok(db)
294 }
295
296 fn simulate_behind_checkpoint_corruption(
299 db: &TestStageDB,
300 prune_count: usize,
301 segment: StaticFileSegment,
302 expected: Option<PipelineTarget>,
303 ) {
304 let mut static_file_provider = db.factory.static_file_provider();
307 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
308
309 {
312 let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
313 let reader = headers_writer.inner().jar().open_data_reader().unwrap();
314 let columns = headers_writer.inner().jar().columns();
315 let data_file = headers_writer.inner().data_file();
316 let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
317 data_file.get_mut().set_len(last_offset).unwrap();
318 data_file.flush().unwrap();
319 data_file.get_ref().sync_all().unwrap();
320 }
321
322 let mut static_file_provider = db.factory.static_file_provider();
325 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
326 assert!(matches!(
327 static_file_provider
328 .check_consistency(&db.factory.database_provider_ro().unwrap()),
329 Ok(e) if e == expected
330 ));
331 }
332
333 fn save_checkpoint_and_check(
336 db: &TestStageDB,
337 stage_id: StageId,
338 checkpoint_block_number: BlockNumber,
339 expected: Option<PipelineTarget>,
340 ) {
341 let provider_rw = db.factory.provider_rw().unwrap();
342 provider_rw
343 .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
344 .unwrap();
345 provider_rw.commit().unwrap();
346
347 assert!(matches!(
348 db.factory
349 .static_file_provider()
350 .check_consistency(&db.factory.database_provider_ro().unwrap(),),
351 Ok(e) if e == expected
352 ));
353 }
354
355 fn update_db_and_check<T: Table<Key = u64>>(
358 db: &TestStageDB,
359 key: u64,
360 expected: Option<PipelineTarget>,
361 ) where
362 <T as Table>::Value: Default,
363 {
364 update_db_with_and_check::<T>(db, key, expected, &Default::default());
365 }
366
367 fn update_db_with_and_check<T: Table<Key = u64>>(
370 db: &TestStageDB,
371 key: u64,
372 expected: Option<PipelineTarget>,
373 value: &T::Value,
374 ) {
375 let provider_rw = db.factory.provider_rw().unwrap();
376 let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
377 cursor.insert(key, value).unwrap();
378 provider_rw.commit().unwrap();
379
380 assert!(matches!(
381 db.factory
382 .static_file_provider()
383 .check_consistency(&db.factory.database_provider_ro().unwrap()),
384 Ok(e) if e == expected
385 ));
386 }
387
388 #[test]
389 fn test_consistency() {
390 let db = seed_data(90).unwrap();
391 let db_provider = db.factory.database_provider_ro().unwrap();
392
393 assert!(matches!(
394 db.factory.static_file_provider().check_consistency(&db_provider),
395 Ok(None)
396 ));
397 }
398
399 #[test]
400 fn test_consistency_no_commit_prune() {
401 let mut db_full = seed_data(90).unwrap();
403 db_full.factory = db_full.factory.with_prune_modes(PruneModes {
404 receipts: Some(PruneMode::Before(1)),
405 ..Default::default()
406 });
407
408 simulate_behind_checkpoint_corruption(&db_full, 1, StaticFileSegment::Receipts, None);
411
412 let db_archive = seed_data(90).unwrap();
414
415 simulate_behind_checkpoint_corruption(
418 &db_archive,
419 1,
420 StaticFileSegment::Receipts,
421 Some(PipelineTarget::Unwind(88)),
422 );
423
424 simulate_behind_checkpoint_corruption(
425 &db_archive,
426 3,
427 StaticFileSegment::Headers,
428 Some(PipelineTarget::Unwind(86)),
429 );
430 }
431
432 #[test]
433 fn test_consistency_checkpoints() {
434 let db = seed_data(90).unwrap();
435
436 let block = 87;
438 save_checkpoint_and_check(&db, StageId::Bodies, block, None);
439 assert_eq!(
440 db.factory
441 .static_file_provider()
442 .get_highest_static_file_block(StaticFileSegment::Transactions),
443 Some(block)
444 );
445 assert_eq!(
446 db.factory
447 .static_file_provider()
448 .get_highest_static_file_tx(StaticFileSegment::Transactions),
449 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
450 );
451
452 let block = 86;
453 save_checkpoint_and_check(&db, StageId::Execution, block, None);
454 assert_eq!(
455 db.factory
456 .static_file_provider()
457 .get_highest_static_file_block(StaticFileSegment::Receipts),
458 Some(block)
459 );
460 assert_eq!(
461 db.factory
462 .static_file_provider()
463 .get_highest_static_file_tx(StaticFileSegment::Receipts),
464 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
465 );
466
467 let block = 80;
468 save_checkpoint_and_check(&db, StageId::Headers, block, None);
469 assert_eq!(
470 db.factory
471 .static_file_provider()
472 .get_highest_static_file_block(StaticFileSegment::Headers),
473 Some(block)
474 );
475
476 save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
478 }
479
480 #[test]
481 fn test_consistency_headers_gap() {
482 let db = seed_data(90).unwrap();
483 let current = db
484 .factory
485 .static_file_provider()
486 .get_highest_static_file_block(StaticFileSegment::Headers)
487 .unwrap();
488
489 update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
491
492 update_db_and_check::<tables::Headers>(&db, current + 1, None);
494 }
495
496 #[test]
497 fn test_consistency_tx_gap() {
498 let db = seed_data(90).unwrap();
499 let current = db
500 .factory
501 .static_file_provider()
502 .get_highest_static_file_tx(StaticFileSegment::Transactions)
503 .unwrap();
504
505 update_db_with_and_check::<tables::Transactions>(
507 &db,
508 current + 2,
509 Some(PipelineTarget::Unwind(89)),
510 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
511 );
512
513 update_db_with_and_check::<tables::Transactions>(
515 &db,
516 current + 1,
517 None,
518 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
519 );
520 }
521
522 #[test]
523 fn test_consistency_receipt_gap() {
524 let db = seed_data(90).unwrap();
525 let current = db
526 .factory
527 .static_file_provider()
528 .get_highest_static_file_tx(StaticFileSegment::Receipts)
529 .unwrap();
530
531 update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
533
534 update_db_and_check::<tables::Receipts>(&db, current + 1, None);
536 }
537}