1mod bodies;
3mod execution;
5mod finish;
7mod hashing_account;
9mod hashing_storage;
11mod headers;
13mod index_account_history;
15mod index_storage_history;
17mod merkle;
19mod merkle_changesets;
21mod prune;
22mod sender_recovery;
24mod tx_lookup;
26
27pub use bodies::*;
28pub use era::*;
29pub use execution::*;
30pub use finish::*;
31pub use hashing_account::*;
32pub use hashing_storage::*;
33pub use headers::*;
34pub use index_account_history::*;
35pub use index_storage_history::*;
36pub use merkle::*;
37pub use merkle_changesets::*;
38pub use prune::*;
39pub use sender_recovery::*;
40pub use tx_lookup::*;
41
42mod era;
43mod utils;
44
45use utils::*;
46
47#[cfg(test)]
48mod tests {
49 use super::*;
50 use crate::test_utils::{StorageKind, TestStageDB};
51 use alloy_consensus::{SignableTransaction, TxLegacy};
52 use alloy_primitives::{
53 address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
54 };
55 use alloy_rlp::Decodable;
56 use reth_chainspec::ChainSpecBuilder;
57 use reth_db::mdbx::{cursor::Cursor, RW};
58 use reth_db_api::{
59 cursor::{DbCursorRO, DbCursorRW},
60 table::Table,
61 tables,
62 transaction::{DbTx, DbTxMut},
63 AccountsHistory,
64 };
65 use reth_ethereum_consensus::EthBeaconConsensus;
66 use reth_ethereum_primitives::Block;
67 use reth_evm_ethereum::EthEvmConfig;
68 use reth_exex::ExExManagerHandle;
69 use reth_primitives_traits::{Account, Bytecode, SealedBlock};
70 use reth_provider::{
71 providers::{StaticFileProvider, StaticFileWriter},
72 test_utils::MockNodeTypesWithDB,
73 AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory,
74 ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
75 StaticFileProviderFactory, StorageReader,
76 };
77 use reth_prune_types::{PruneMode, PruneModes};
78 use reth_stages_api::{
79 ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
80 };
81 use reth_static_file_types::StaticFileSegment;
82 use reth_testing_utils::generators::{
83 self, random_block, random_block_range, random_receipt, BlockRangeParams,
84 };
85 use std::{io::Write, sync::Arc};
86
87 #[tokio::test]
88 #[ignore]
89 async fn test_prune() {
90 let test_db = TestStageDB::default();
91
92 let provider_rw = test_db.factory.provider_rw().unwrap();
93 let tip = 66;
94 let input = ExecInput { target: Some(tip), checkpoint: None };
95 let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
96 let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
97 let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
98 let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
99 provider_rw.insert_block(genesis.try_recover().unwrap()).unwrap();
100 provider_rw.insert_block(block.clone().try_recover().unwrap()).unwrap();
101
102 let mut head = block.hash();
104 let mut rng = generators::rng();
105 for block_number in 2..=tip {
106 let nblock = random_block(
107 &mut rng,
108 block_number,
109 generators::BlockParams { parent: Some(head), ..Default::default() },
110 );
111 head = nblock.hash();
112 provider_rw.insert_block(nblock.try_recover().unwrap()).unwrap();
113 }
114 provider_rw
115 .static_file_provider()
116 .latest_writer(StaticFileSegment::Headers)
117 .unwrap()
118 .commit()
119 .unwrap();
120 provider_rw.commit().unwrap();
121
122 let provider_rw = test_db.factory.provider_rw().unwrap();
124 let code = hex!("5a465a905090036002900360015500");
125 let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
126 provider_rw
127 .tx_ref()
128 .put::<tables::PlainAccountState>(
129 address!("0x1000000000000000000000000000000000000000"),
130 Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
131 )
132 .unwrap();
133 provider_rw
134 .tx_ref()
135 .put::<tables::PlainAccountState>(
136 address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
137 Account {
138 nonce: 0,
139 balance: U256::from(0x3635c9adc5dea00000u128),
140 bytecode_hash: None,
141 },
142 )
143 .unwrap();
144 provider_rw
145 .tx_ref()
146 .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
147 .unwrap();
148 provider_rw.commit().unwrap();
149
150 let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
151 prune_modes: PruneModes,
152 expect_num_receipts: usize,
153 expect_num_acc_changesets: usize,
154 expect_num_storage_changesets: usize| async move {
155 let provider = factory.database_provider_rw().unwrap();
156
157 let mut execution_stage = ExecutionStage::new(
160 EthEvmConfig::ethereum(Arc::new(
161 ChainSpecBuilder::mainnet().berlin_activated().build(),
162 )),
163 Arc::new(EthBeaconConsensus::new(Arc::new(
164 ChainSpecBuilder::mainnet().berlin_activated().build(),
165 ))),
166 ExecutionStageThresholds {
167 max_blocks: Some(100),
168 max_changes: None,
169 max_cumulative_gas: None,
170 max_duration: None,
171 },
172 MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
173 ExExManagerHandle::empty(),
174 );
175
176 execution_stage.execute(&provider, input).unwrap();
177 assert_eq!(
178 provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
179 expect_num_receipts
180 );
181
182 assert_eq!(
183 provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
184 expect_num_storage_changesets
185 );
186
187 assert_eq!(
188 provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
189 expect_num_acc_changesets
190 );
191
192 let mut acc_indexing_stage = IndexAccountHistoryStage {
194 prune_mode: prune_modes.account_history,
195 ..Default::default()
196 };
197
198 if prune_modes.account_history == Some(PruneMode::Full) {
199 assert!(acc_indexing_stage.execute(&provider, input).is_err());
201 } else {
202 acc_indexing_stage.execute(&provider, input).unwrap();
203 let mut account_history: Cursor<RW, AccountsHistory> =
204 provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
205 assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
206 }
207
208 let mut storage_indexing_stage = IndexStorageHistoryStage {
210 prune_mode: prune_modes.storage_history,
211 ..Default::default()
212 };
213
214 if prune_modes.storage_history == Some(PruneMode::Full) {
215 assert!(storage_indexing_stage.execute(&provider, input).is_err());
217 } else {
218 storage_indexing_stage.execute(&provider, input).unwrap();
219
220 let mut storage_history =
221 provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
222 assert_eq!(
223 storage_history.walk(None).unwrap().count(),
224 expect_num_storage_changesets
225 );
226 }
227 };
228
229 let mut prune = PruneModes::default();
232 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
233
234 prune.receipts = Some(PruneMode::Full);
235 prune.account_history = Some(PruneMode::Full);
236 prune.storage_history = Some(PruneMode::Full);
237 check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
239
240 prune.receipts = Some(PruneMode::Before(1));
241 prune.account_history = Some(PruneMode::Before(1));
242 prune.storage_history = Some(PruneMode::Before(1));
243 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
244
245 prune.receipts = Some(PruneMode::Before(2));
246 prune.account_history = Some(PruneMode::Before(2));
247 prune.storage_history = Some(PruneMode::Before(2));
248 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
250
251 prune.receipts = Some(PruneMode::Distance(66));
252 prune.account_history = Some(PruneMode::Distance(66));
253 prune.storage_history = Some(PruneMode::Distance(66));
254 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
255
256 prune.receipts = Some(PruneMode::Distance(64));
257 prune.account_history = Some(PruneMode::Distance(64));
258 prune.storage_history = Some(PruneMode::Distance(64));
259 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
261 }
262
263 fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
266 let db = TestStageDB::default();
267 let mut rng = generators::rng();
268 let genesis_hash = B256::ZERO;
269 let tip = (num_blocks - 1) as u64;
270
271 let blocks = random_block_range(
272 &mut rng,
273 0..=tip,
274 BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
275 );
276 db.insert_blocks(blocks.iter(), StorageKind::Static)?;
277
278 let mut receipts = Vec::with_capacity(blocks.len());
279 let mut tx_num = 0u64;
280 for block in &blocks {
281 let mut block_receipts = Vec::with_capacity(block.transaction_count());
282 for transaction in &block.body().transactions {
283 block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0), None)));
284 tx_num += 1;
285 }
286 receipts.push((block.number, block_receipts));
287 }
288 db.insert_receipts_by_block(receipts, StorageKind::Static)?;
289
290 let provider_rw = db.factory.provider_rw()?;
292 for stage in StageId::ALL {
293 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
294 }
295 provider_rw.commit()?;
296
297 Ok(db)
298 }
299
300 fn simulate_behind_checkpoint_corruption(
303 db: &TestStageDB,
304 prune_count: usize,
305 segment: StaticFileSegment,
306 is_full_node: bool,
307 expected: Option<PipelineTarget>,
308 ) {
309 let mut static_file_provider = db.factory.static_file_provider();
312 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
313
314 {
317 let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
318 let reader = headers_writer.inner().jar().open_data_reader().unwrap();
319 let columns = headers_writer.inner().jar().columns();
320 let data_file = headers_writer.inner().data_file();
321 let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
322 data_file.get_mut().set_len(last_offset).unwrap();
323 data_file.flush().unwrap();
324 data_file.get_ref().sync_all().unwrap();
325 }
326
327 let mut static_file_provider = db.factory.static_file_provider();
330 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
331 assert!(matches!(
332 static_file_provider
333 .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
334 Ok(e) if e == expected
335 ));
336 }
337
338 fn save_checkpoint_and_check(
341 db: &TestStageDB,
342 stage_id: StageId,
343 checkpoint_block_number: BlockNumber,
344 expected: Option<PipelineTarget>,
345 ) {
346 let provider_rw = db.factory.provider_rw().unwrap();
347 provider_rw
348 .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
349 .unwrap();
350 provider_rw.commit().unwrap();
351
352 assert!(matches!(
353 db.factory
354 .static_file_provider()
355 .check_consistency(&db.factory.database_provider_ro().unwrap(), false,),
356 Ok(e) if e == expected
357 ));
358 }
359
360 fn update_db_and_check<T: Table<Key = u64>>(
363 db: &TestStageDB,
364 key: u64,
365 expected: Option<PipelineTarget>,
366 ) where
367 <T as Table>::Value: Default,
368 {
369 update_db_with_and_check::<T>(db, key, expected, &Default::default());
370 }
371
372 fn update_db_with_and_check<T: Table<Key = u64>>(
375 db: &TestStageDB,
376 key: u64,
377 expected: Option<PipelineTarget>,
378 value: &T::Value,
379 ) {
380 let provider_rw = db.factory.provider_rw().unwrap();
381 let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
382 cursor.insert(key, value).unwrap();
383 provider_rw.commit().unwrap();
384
385 assert!(matches!(
386 db.factory
387 .static_file_provider()
388 .check_consistency(&db.factory.database_provider_ro().unwrap(), false),
389 Ok(e) if e == expected
390 ));
391 }
392
393 #[test]
394 fn test_consistency() {
395 let db = seed_data(90).unwrap();
396 let db_provider = db.factory.database_provider_ro().unwrap();
397
398 assert!(matches!(
399 db.factory.static_file_provider().check_consistency(&db_provider, false),
400 Ok(None)
401 ));
402 }
403
404 #[test]
405 fn test_consistency_no_commit_prune() {
406 let db = seed_data(90).unwrap();
407 let full_node = true;
408 let archive_node = !full_node;
409
410 simulate_behind_checkpoint_corruption(&db, 1, StaticFileSegment::Receipts, full_node, None);
413
414 simulate_behind_checkpoint_corruption(
417 &db,
418 1,
419 StaticFileSegment::Receipts,
420 archive_node,
421 Some(PipelineTarget::Unwind(88)),
422 );
423
424 simulate_behind_checkpoint_corruption(
425 &db,
426 3,
427 StaticFileSegment::Headers,
428 archive_node,
429 Some(PipelineTarget::Unwind(86)),
430 );
431 }
432
433 #[test]
434 fn test_consistency_checkpoints() {
435 let db = seed_data(90).unwrap();
436
437 let block = 87;
439 save_checkpoint_and_check(&db, StageId::Bodies, block, None);
440 assert_eq!(
441 db.factory
442 .static_file_provider()
443 .get_highest_static_file_block(StaticFileSegment::Transactions),
444 Some(block)
445 );
446 assert_eq!(
447 db.factory
448 .static_file_provider()
449 .get_highest_static_file_tx(StaticFileSegment::Transactions),
450 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
451 );
452
453 let block = 86;
454 save_checkpoint_and_check(&db, StageId::Execution, block, None);
455 assert_eq!(
456 db.factory
457 .static_file_provider()
458 .get_highest_static_file_block(StaticFileSegment::Receipts),
459 Some(block)
460 );
461 assert_eq!(
462 db.factory
463 .static_file_provider()
464 .get_highest_static_file_tx(StaticFileSegment::Receipts),
465 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
466 );
467
468 let block = 80;
469 save_checkpoint_and_check(&db, StageId::Headers, block, None);
470 assert_eq!(
471 db.factory
472 .static_file_provider()
473 .get_highest_static_file_block(StaticFileSegment::Headers),
474 Some(block)
475 );
476
477 save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
479 }
480
481 #[test]
482 fn test_consistency_headers_gap() {
483 let db = seed_data(90).unwrap();
484 let current = db
485 .factory
486 .static_file_provider()
487 .get_highest_static_file_block(StaticFileSegment::Headers)
488 .unwrap();
489
490 update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
492
493 update_db_and_check::<tables::Headers>(&db, current + 1, None);
495 }
496
497 #[test]
498 fn test_consistency_tx_gap() {
499 let db = seed_data(90).unwrap();
500 let current = db
501 .factory
502 .static_file_provider()
503 .get_highest_static_file_tx(StaticFileSegment::Transactions)
504 .unwrap();
505
506 update_db_with_and_check::<tables::Transactions>(
508 &db,
509 current + 2,
510 Some(PipelineTarget::Unwind(89)),
511 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
512 );
513
514 update_db_with_and_check::<tables::Transactions>(
516 &db,
517 current + 1,
518 None,
519 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
520 );
521 }
522
523 #[test]
524 fn test_consistency_receipt_gap() {
525 let db = seed_data(90).unwrap();
526 let current = db
527 .factory
528 .static_file_provider()
529 .get_highest_static_file_tx(StaticFileSegment::Receipts)
530 .unwrap();
531
532 update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
534
535 update_db_and_check::<tables::Receipts>(&db, current + 1, None);
537 }
538}