1mod bodies;
3mod execution;
5mod finish;
7mod hashing_account;
9mod hashing_storage;
11mod headers;
13mod index_account_history;
15mod index_storage_history;
17mod merkle;
19mod merkle_changesets;
21mod prune;
22mod sender_recovery;
24mod tx_lookup;
26
27pub use bodies::*;
28pub use era::*;
29pub use execution::*;
30pub use finish::*;
31pub use hashing_account::*;
32pub use hashing_storage::*;
33pub use headers::*;
34pub use index_account_history::*;
35pub use index_storage_history::*;
36pub use merkle::*;
37pub use merkle_changesets::*;
38pub use prune::*;
39pub use sender_recovery::*;
40pub use tx_lookup::*;
41
42mod era;
43mod utils;
44
45use utils::*;
46
47#[cfg(test)]
48mod tests {
49 use super::*;
50 use crate::test_utils::{StorageKind, TestStageDB};
51 use alloy_consensus::{SignableTransaction, TxLegacy};
52 use alloy_primitives::{
53 address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
54 };
55 use alloy_rlp::Decodable;
56 use reth_chainspec::ChainSpecBuilder;
57 use reth_db::mdbx::{cursor::Cursor, RW};
58 use reth_db_api::{
59 cursor::{DbCursorRO, DbCursorRW},
60 table::Table,
61 tables,
62 transaction::{DbTx, DbTxMut},
63 AccountsHistory,
64 };
65 use reth_ethereum_consensus::EthBeaconConsensus;
66 use reth_ethereum_primitives::Block;
67 use reth_evm_ethereum::EthEvmConfig;
68 use reth_exex::ExExManagerHandle;
69 use reth_primitives_traits::{Account, Bytecode, SealedBlock};
70 use reth_provider::{
71 providers::{StaticFileProvider, StaticFileWriter},
72 test_utils::MockNodeTypesWithDB,
73 AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory,
74 ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
75 StaticFileProviderFactory, StorageReader,
76 };
77 use reth_prune_types::{PruneMode, PruneModes};
78 use reth_stages_api::{
79 ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
80 };
81 use reth_static_file_types::StaticFileSegment;
82 use reth_testing_utils::generators::{
83 self, random_block, random_block_range, random_receipt, BlockRangeParams,
84 };
85 use std::{io::Write, sync::Arc};
86
87 #[tokio::test]
88 #[ignore]
89 async fn test_prune() {
90 let test_db = TestStageDB::default();
91
92 let provider_rw = test_db.factory.provider_rw().unwrap();
93 let tip = 66;
94 let input = ExecInput { target: Some(tip), checkpoint: None };
95 let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
96 let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
97 let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
98 let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
99 provider_rw.insert_block(genesis.try_recover().unwrap()).unwrap();
100 provider_rw.insert_block(block.clone().try_recover().unwrap()).unwrap();
101
102 let mut head = block.hash();
104 let mut rng = generators::rng();
105 for block_number in 2..=tip {
106 let nblock = random_block(
107 &mut rng,
108 block_number,
109 generators::BlockParams { parent: Some(head), ..Default::default() },
110 );
111 head = nblock.hash();
112 provider_rw.insert_block(nblock.try_recover().unwrap()).unwrap();
113 }
114 provider_rw
115 .static_file_provider()
116 .latest_writer(StaticFileSegment::Headers)
117 .unwrap()
118 .commit()
119 .unwrap();
120 provider_rw.commit().unwrap();
121
122 let provider_rw = test_db.factory.provider_rw().unwrap();
124 let code = hex!("5a465a905090036002900360015500");
125 let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
126 provider_rw
127 .tx_ref()
128 .put::<tables::PlainAccountState>(
129 address!("0x1000000000000000000000000000000000000000"),
130 Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
131 )
132 .unwrap();
133 provider_rw
134 .tx_ref()
135 .put::<tables::PlainAccountState>(
136 address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
137 Account {
138 nonce: 0,
139 balance: U256::from(0x3635c9adc5dea00000u128),
140 bytecode_hash: None,
141 },
142 )
143 .unwrap();
144 provider_rw
145 .tx_ref()
146 .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
147 .unwrap();
148 provider_rw.commit().unwrap();
149
150 let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
151 prune_modes: PruneModes,
152 expect_num_receipts: usize,
153 expect_num_acc_changesets: usize,
154 expect_num_storage_changesets: usize| async move {
155 let provider = factory.database_provider_rw().unwrap();
156
157 let mut execution_stage = ExecutionStage::new(
160 EthEvmConfig::ethereum(Arc::new(
161 ChainSpecBuilder::mainnet().berlin_activated().build(),
162 )),
163 Arc::new(EthBeaconConsensus::new(Arc::new(
164 ChainSpecBuilder::mainnet().berlin_activated().build(),
165 ))),
166 ExecutionStageThresholds {
167 max_blocks: Some(100),
168 max_changes: None,
169 max_cumulative_gas: None,
170 max_duration: None,
171 },
172 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
173 ExExManagerHandle::empty(),
174 );
175
176 execution_stage.execute(&provider, input).unwrap();
177 assert_eq!(
178 provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
179 expect_num_receipts
180 );
181
182 assert_eq!(
183 provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
184 expect_num_storage_changesets
185 );
186
187 assert_eq!(
188 provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
189 expect_num_acc_changesets
190 );
191
192 let mut acc_indexing_stage = IndexAccountHistoryStage {
194 prune_mode: prune_modes.account_history,
195 ..Default::default()
196 };
197
198 if prune_modes.account_history == Some(PruneMode::Full) {
199 assert!(acc_indexing_stage.execute(&provider, input).is_err());
201 } else {
202 acc_indexing_stage.execute(&provider, input).unwrap();
203 let mut account_history: Cursor<RW, AccountsHistory> =
204 provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
205 assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
206 }
207
208 let mut storage_indexing_stage = IndexStorageHistoryStage {
210 prune_mode: prune_modes.storage_history,
211 ..Default::default()
212 };
213
214 if prune_modes.storage_history == Some(PruneMode::Full) {
215 assert!(storage_indexing_stage.execute(&provider, input).is_err());
217 } else {
218 storage_indexing_stage.execute(&provider, input).unwrap();
219
220 let mut storage_history =
221 provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
222 assert_eq!(
223 storage_history.walk(None).unwrap().count(),
224 expect_num_storage_changesets
225 );
226 }
227 };
228
229 let mut prune = PruneModes::default();
232 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
233
234 prune.receipts = Some(PruneMode::Full);
235 prune.account_history = Some(PruneMode::Full);
236 prune.storage_history = Some(PruneMode::Full);
237 check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
239
240 prune.receipts = Some(PruneMode::Before(1));
241 prune.account_history = Some(PruneMode::Before(1));
242 prune.storage_history = Some(PruneMode::Before(1));
243 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
244
245 prune.receipts = Some(PruneMode::Before(2));
246 prune.account_history = Some(PruneMode::Before(2));
247 prune.storage_history = Some(PruneMode::Before(2));
248 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
250
251 prune.receipts = Some(PruneMode::Distance(66));
252 prune.account_history = Some(PruneMode::Distance(66));
253 prune.storage_history = Some(PruneMode::Distance(66));
254 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
255
256 prune.receipts = Some(PruneMode::Distance(64));
257 prune.account_history = Some(PruneMode::Distance(64));
258 prune.storage_history = Some(PruneMode::Distance(64));
259 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
261 }
262
263 fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
266 let db = TestStageDB::default();
267 let mut rng = generators::rng();
268 let genesis_hash = B256::ZERO;
269 let tip = (num_blocks - 1) as u64;
270
271 let blocks = random_block_range(
272 &mut rng,
273 0..=tip,
274 BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
275 );
276 db.insert_blocks(blocks.iter(), StorageKind::Static)?;
277
278 let mut receipts = Vec::with_capacity(blocks.len());
279 let mut tx_num = 0u64;
280 for block in &blocks {
281 let mut block_receipts = Vec::with_capacity(block.transaction_count());
282 for transaction in &block.body().transactions {
283 block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0), None)));
284 tx_num += 1;
285 }
286 receipts.push((block.number, block_receipts));
287 }
288 db.insert_receipts_by_block(receipts, StorageKind::Static)?;
289
290 let provider_rw = db.factory.provider_rw()?;
292 for stage in StageId::ALL {
293 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
294 }
295 provider_rw.commit()?;
296
297 Ok(db)
298 }
299
300 fn simulate_behind_checkpoint_corruption(
303 db: &TestStageDB,
304 prune_count: usize,
305 segment: StaticFileSegment,
306 expected: Option<PipelineTarget>,
307 ) {
308 let mut static_file_provider = db.factory.static_file_provider();
311 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
312
313 {
316 let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
317 let reader = headers_writer.inner().jar().open_data_reader().unwrap();
318 let columns = headers_writer.inner().jar().columns();
319 let data_file = headers_writer.inner().data_file();
320 let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
321 data_file.get_mut().set_len(last_offset).unwrap();
322 data_file.flush().unwrap();
323 data_file.get_ref().sync_all().unwrap();
324 }
325
326 let mut static_file_provider = db.factory.static_file_provider();
329 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
330 assert!(matches!(
331 static_file_provider
332 .check_consistency(&db.factory.database_provider_ro().unwrap()),
333 Ok(e) if e == expected
334 ));
335 }
336
337 fn save_checkpoint_and_check(
340 db: &TestStageDB,
341 stage_id: StageId,
342 checkpoint_block_number: BlockNumber,
343 expected: Option<PipelineTarget>,
344 ) {
345 let provider_rw = db.factory.provider_rw().unwrap();
346 provider_rw
347 .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
348 .unwrap();
349 provider_rw.commit().unwrap();
350
351 assert!(matches!(
352 db.factory
353 .static_file_provider()
354 .check_consistency(&db.factory.database_provider_ro().unwrap(),),
355 Ok(e) if e == expected
356 ));
357 }
358
359 fn update_db_and_check<T: Table<Key = u64>>(
362 db: &TestStageDB,
363 key: u64,
364 expected: Option<PipelineTarget>,
365 ) where
366 <T as Table>::Value: Default,
367 {
368 update_db_with_and_check::<T>(db, key, expected, &Default::default());
369 }
370
371 fn update_db_with_and_check<T: Table<Key = u64>>(
374 db: &TestStageDB,
375 key: u64,
376 expected: Option<PipelineTarget>,
377 value: &T::Value,
378 ) {
379 let provider_rw = db.factory.provider_rw().unwrap();
380 let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
381 cursor.insert(key, value).unwrap();
382 provider_rw.commit().unwrap();
383
384 assert!(matches!(
385 db.factory
386 .static_file_provider()
387 .check_consistency(&db.factory.database_provider_ro().unwrap()),
388 Ok(e) if e == expected
389 ));
390 }
391
392 #[test]
393 fn test_consistency() {
394 let db = seed_data(90).unwrap();
395 let db_provider = db.factory.database_provider_ro().unwrap();
396
397 assert!(matches!(
398 db.factory.static_file_provider().check_consistency(&db_provider),
399 Ok(None)
400 ));
401 }
402
403 #[test]
404 fn test_consistency_no_commit_prune() {
405 let mut db_full = seed_data(90).unwrap();
407 db_full.factory = db_full.factory.with_prune_modes(PruneModes {
408 receipts: Some(PruneMode::Before(1)),
409 ..Default::default()
410 });
411
412 simulate_behind_checkpoint_corruption(&db_full, 1, StaticFileSegment::Receipts, None);
415
416 let db_archive = seed_data(90).unwrap();
418
419 simulate_behind_checkpoint_corruption(
422 &db_archive,
423 1,
424 StaticFileSegment::Receipts,
425 Some(PipelineTarget::Unwind(88)),
426 );
427
428 simulate_behind_checkpoint_corruption(
429 &db_archive,
430 3,
431 StaticFileSegment::Headers,
432 Some(PipelineTarget::Unwind(86)),
433 );
434 }
435
436 #[test]
437 fn test_consistency_checkpoints() {
438 let db = seed_data(90).unwrap();
439
440 let block = 87;
442 save_checkpoint_and_check(&db, StageId::Bodies, block, None);
443 assert_eq!(
444 db.factory
445 .static_file_provider()
446 .get_highest_static_file_block(StaticFileSegment::Transactions),
447 Some(block)
448 );
449 assert_eq!(
450 db.factory
451 .static_file_provider()
452 .get_highest_static_file_tx(StaticFileSegment::Transactions),
453 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
454 );
455
456 let block = 86;
457 save_checkpoint_and_check(&db, StageId::Execution, block, None);
458 assert_eq!(
459 db.factory
460 .static_file_provider()
461 .get_highest_static_file_block(StaticFileSegment::Receipts),
462 Some(block)
463 );
464 assert_eq!(
465 db.factory
466 .static_file_provider()
467 .get_highest_static_file_tx(StaticFileSegment::Receipts),
468 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
469 );
470
471 let block = 80;
472 save_checkpoint_and_check(&db, StageId::Headers, block, None);
473 assert_eq!(
474 db.factory
475 .static_file_provider()
476 .get_highest_static_file_block(StaticFileSegment::Headers),
477 Some(block)
478 );
479
480 save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
482 }
483
484 #[test]
485 fn test_consistency_headers_gap() {
486 let db = seed_data(90).unwrap();
487 let current = db
488 .factory
489 .static_file_provider()
490 .get_highest_static_file_block(StaticFileSegment::Headers)
491 .unwrap();
492
493 update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
495
496 update_db_and_check::<tables::Headers>(&db, current + 1, None);
498 }
499
500 #[test]
501 fn test_consistency_tx_gap() {
502 let db = seed_data(90).unwrap();
503 let current = db
504 .factory
505 .static_file_provider()
506 .get_highest_static_file_tx(StaticFileSegment::Transactions)
507 .unwrap();
508
509 update_db_with_and_check::<tables::Transactions>(
511 &db,
512 current + 2,
513 Some(PipelineTarget::Unwind(89)),
514 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
515 );
516
517 update_db_with_and_check::<tables::Transactions>(
519 &db,
520 current + 1,
521 None,
522 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
523 );
524 }
525
526 #[test]
527 fn test_consistency_receipt_gap() {
528 let db = seed_data(90).unwrap();
529 let current = db
530 .factory
531 .static_file_provider()
532 .get_highest_static_file_tx(StaticFileSegment::Receipts)
533 .unwrap();
534
535 update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
537
538 update_db_and_check::<tables::Receipts>(&db, current + 1, None);
540 }
541}