1mod bodies;
3mod execution;
5mod finish;
7mod hashing_account;
9mod hashing_storage;
11mod headers;
13mod index_account_history;
15mod index_storage_history;
17mod merkle;
19mod prune;
20mod s3;
22mod sender_recovery;
24mod tx_lookup;
26
27pub use bodies::*;
28pub use execution::*;
29pub use finish::*;
30pub use hashing_account::*;
31pub use hashing_storage::*;
32pub use headers::*;
33pub use index_account_history::*;
34pub use index_storage_history::*;
35pub use merkle::*;
36pub use prune::*;
37pub use s3::*;
38pub use sender_recovery::*;
39pub use tx_lookup::*;
40
41mod utils;
42use utils::*;
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47 use crate::test_utils::{StorageKind, TestStageDB};
48 use alloy_consensus::{SignableTransaction, TxLegacy};
49 use alloy_primitives::{
50 address, hex_literal::hex, keccak256, BlockNumber, Signature, B256, U256,
51 };
52 use alloy_rlp::Decodable;
53 use reth_chainspec::ChainSpecBuilder;
54 use reth_db::mdbx::{cursor::Cursor, RW};
55 use reth_db_api::{
56 cursor::{DbCursorRO, DbCursorRW},
57 table::Table,
58 tables,
59 transaction::{DbTx, DbTxMut},
60 AccountsHistory,
61 };
62 use reth_ethereum_consensus::EthBeaconConsensus;
63 use reth_ethereum_primitives::Block;
64 use reth_evm_ethereum::execute::EthExecutorProvider;
65 use reth_exex::ExExManagerHandle;
66 use reth_primitives_traits::{Account, Bytecode, SealedBlock};
67 use reth_provider::{
68 providers::{StaticFileProvider, StaticFileWriter},
69 test_utils::MockNodeTypesWithDB,
70 AccountExtReader, BlockBodyIndicesProvider, DatabaseProviderFactory, ProviderFactory,
71 ProviderResult, ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory,
72 StorageReader,
73 };
74 use reth_prune_types::{PruneMode, PruneModes};
75 use reth_stages_api::{
76 ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
77 };
78 use reth_static_file_types::StaticFileSegment;
79 use reth_testing_utils::generators::{
80 self, random_block, random_block_range, random_receipt, BlockRangeParams,
81 };
82 use std::{io::Write, sync::Arc};
83
84 #[tokio::test]
85 #[ignore]
86 async fn test_prune() {
87 let test_db = TestStageDB::default();
88
89 let provider_rw = test_db.factory.provider_rw().unwrap();
90 let tip = 66;
91 let input = ExecInput { target: Some(tip), checkpoint: None };
92 let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421bbe400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
93 let genesis = SealedBlock::<Block>::decode(&mut genesis_rlp).unwrap();
94 let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabbbe40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
95 let block = SealedBlock::<Block>::decode(&mut block_rlp).unwrap();
96 provider_rw.insert_historical_block(genesis.try_recover().unwrap()).unwrap();
97 provider_rw.insert_historical_block(block.clone().try_recover().unwrap()).unwrap();
98
99 let mut head = block.hash();
101 let mut rng = generators::rng();
102 for block_number in 2..=tip {
103 let nblock = random_block(
104 &mut rng,
105 block_number,
106 generators::BlockParams { parent: Some(head), ..Default::default() },
107 );
108 head = nblock.hash();
109 provider_rw.insert_historical_block(nblock.try_recover().unwrap()).unwrap();
110 }
111 provider_rw
112 .static_file_provider()
113 .latest_writer(StaticFileSegment::Headers)
114 .unwrap()
115 .commit()
116 .unwrap();
117 provider_rw.commit().unwrap();
118
119 let provider_rw = test_db.factory.provider_rw().unwrap();
121 let code = hex!("5a465a905090036002900360015500");
122 let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
123 provider_rw
124 .tx_ref()
125 .put::<tables::PlainAccountState>(
126 address!("0x1000000000000000000000000000000000000000"),
127 Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
128 )
129 .unwrap();
130 provider_rw
131 .tx_ref()
132 .put::<tables::PlainAccountState>(
133 address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
134 Account {
135 nonce: 0,
136 balance: U256::from(0x3635c9adc5dea00000u128),
137 bytecode_hash: None,
138 },
139 )
140 .unwrap();
141 provider_rw
142 .tx_ref()
143 .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
144 .unwrap();
145 provider_rw.commit().unwrap();
146
147 let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
148 prune_modes: PruneModes,
149 expect_num_receipts: usize,
150 expect_num_acc_changesets: usize,
151 expect_num_storage_changesets: usize| async move {
152 let provider = factory.database_provider_rw().unwrap();
153
154 let mut execution_stage = ExecutionStage::new(
157 EthExecutorProvider::ethereum(Arc::new(
158 ChainSpecBuilder::mainnet().berlin_activated().build(),
159 )),
160 Arc::new(EthBeaconConsensus::new(Arc::new(
161 ChainSpecBuilder::mainnet().berlin_activated().build(),
162 ))),
163 ExecutionStageThresholds {
164 max_blocks: Some(100),
165 max_changes: None,
166 max_cumulative_gas: None,
167 max_duration: None,
168 },
169 MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
170 ExExManagerHandle::empty(),
171 );
172
173 execution_stage.execute(&provider, input).unwrap();
174 assert_eq!(
175 provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
176 expect_num_receipts
177 );
178
179 assert_eq!(
180 provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
181 expect_num_storage_changesets
182 );
183
184 assert_eq!(
185 provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
186 expect_num_acc_changesets
187 );
188
189 let mut acc_indexing_stage = IndexAccountHistoryStage {
191 prune_mode: prune_modes.account_history,
192 ..Default::default()
193 };
194
195 if prune_modes.account_history == Some(PruneMode::Full) {
196 assert!(acc_indexing_stage.execute(&provider, input).is_err());
198 } else {
199 acc_indexing_stage.execute(&provider, input).unwrap();
200 let mut account_history: Cursor<RW, AccountsHistory> =
201 provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
202 assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
203 }
204
205 let mut storage_indexing_stage = IndexStorageHistoryStage {
207 prune_mode: prune_modes.storage_history,
208 ..Default::default()
209 };
210
211 if prune_modes.storage_history == Some(PruneMode::Full) {
212 assert!(acc_indexing_stage.execute(&provider, input).is_err());
214 } else {
215 storage_indexing_stage.execute(&provider, input).unwrap();
216
217 let mut storage_history =
218 provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
219 assert_eq!(
220 storage_history.walk(None).unwrap().count(),
221 expect_num_storage_changesets
222 );
223 }
224 };
225
226 let mut prune = PruneModes::none();
229 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
230
231 prune.receipts = Some(PruneMode::Full);
232 prune.account_history = Some(PruneMode::Full);
233 prune.storage_history = Some(PruneMode::Full);
234 check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
236
237 prune.receipts = Some(PruneMode::Before(1));
238 prune.account_history = Some(PruneMode::Before(1));
239 prune.storage_history = Some(PruneMode::Before(1));
240 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
241
242 prune.receipts = Some(PruneMode::Before(2));
243 prune.account_history = Some(PruneMode::Before(2));
244 prune.storage_history = Some(PruneMode::Before(2));
245 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
247
248 prune.receipts = Some(PruneMode::Distance(66));
249 prune.account_history = Some(PruneMode::Distance(66));
250 prune.storage_history = Some(PruneMode::Distance(66));
251 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
252
253 prune.receipts = Some(PruneMode::Distance(64));
254 prune.account_history = Some(PruneMode::Distance(64));
255 prune.storage_history = Some(PruneMode::Distance(64));
256 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
258 }
259
260 fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
263 let db = TestStageDB::default();
264 let mut rng = generators::rng();
265 let genesis_hash = B256::ZERO;
266 let tip = (num_blocks - 1) as u64;
267
268 let blocks = random_block_range(
269 &mut rng,
270 0..=tip,
271 BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
272 );
273 db.insert_blocks(blocks.iter(), StorageKind::Static)?;
274
275 let mut receipts = Vec::with_capacity(blocks.len());
276 let mut tx_num = 0u64;
277 for block in &blocks {
278 let mut block_receipts = Vec::with_capacity(block.transaction_count());
279 for transaction in &block.body().transactions {
280 block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0))));
281 tx_num += 1;
282 }
283 receipts.push((block.number, block_receipts));
284 }
285 db.insert_receipts_by_block(receipts, StorageKind::Static)?;
286
287 let provider_rw = db.factory.provider_rw()?;
289 for stage in StageId::ALL {
290 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
291 }
292 provider_rw.commit()?;
293
294 Ok(db)
295 }
296
297 fn simulate_behind_checkpoint_corruption(
300 db: &TestStageDB,
301 prune_count: usize,
302 segment: StaticFileSegment,
303 is_full_node: bool,
304 expected: Option<PipelineTarget>,
305 ) {
306 let mut static_file_provider = db.factory.static_file_provider();
309 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
310
311 {
314 let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
315 let reader = headers_writer.inner().jar().open_data_reader().unwrap();
316 let columns = headers_writer.inner().jar().columns();
317 let data_file = headers_writer.inner().data_file();
318 let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
319 data_file.get_mut().set_len(last_offset).unwrap();
320 data_file.flush().unwrap();
321 data_file.get_ref().sync_all().unwrap();
322 }
323
324 let mut static_file_provider = db.factory.static_file_provider();
327 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
328 assert!(matches!(
329 static_file_provider
330 .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
331 Ok(e) if e == expected
332 ));
333 }
334
335 fn save_checkpoint_and_check(
338 db: &TestStageDB,
339 stage_id: StageId,
340 checkpoint_block_number: BlockNumber,
341 expected: Option<PipelineTarget>,
342 ) {
343 let provider_rw = db.factory.provider_rw().unwrap();
344 provider_rw
345 .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
346 .unwrap();
347 provider_rw.commit().unwrap();
348
349 assert!(matches!(
350 db.factory
351 .static_file_provider()
352 .check_consistency(&db.factory.database_provider_ro().unwrap(), false,),
353 Ok(e) if e == expected
354 ));
355 }
356
357 fn update_db_and_check<T: Table<Key = u64>>(
360 db: &TestStageDB,
361 key: u64,
362 expected: Option<PipelineTarget>,
363 ) where
364 <T as Table>::Value: Default,
365 {
366 update_db_with_and_check::<T>(db, key, expected, &Default::default());
367 }
368
369 fn update_db_with_and_check<T: Table<Key = u64>>(
372 db: &TestStageDB,
373 key: u64,
374 expected: Option<PipelineTarget>,
375 value: &T::Value,
376 ) {
377 let provider_rw = db.factory.provider_rw().unwrap();
378 let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
379 cursor.insert(key, value).unwrap();
380 provider_rw.commit().unwrap();
381
382 assert!(matches!(
383 db.factory
384 .static_file_provider()
385 .check_consistency(&db.factory.database_provider_ro().unwrap(), false),
386 Ok(e) if e == expected
387 ));
388 }
389
390 #[test]
391 fn test_consistency() {
392 let db = seed_data(90).unwrap();
393 let db_provider = db.factory.database_provider_ro().unwrap();
394
395 assert!(matches!(
396 db.factory.static_file_provider().check_consistency(&db_provider, false),
397 Ok(None)
398 ));
399 }
400
401 #[test]
402 fn test_consistency_no_commit_prune() {
403 let db = seed_data(90).unwrap();
404 let full_node = true;
405 let archive_node = !full_node;
406
407 simulate_behind_checkpoint_corruption(&db, 1, StaticFileSegment::Receipts, full_node, None);
410
411 simulate_behind_checkpoint_corruption(
414 &db,
415 1,
416 StaticFileSegment::Receipts,
417 archive_node,
418 Some(PipelineTarget::Unwind(88)),
419 );
420
421 simulate_behind_checkpoint_corruption(
422 &db,
423 3,
424 StaticFileSegment::Headers,
425 archive_node,
426 Some(PipelineTarget::Unwind(86)),
427 );
428 }
429
430 #[test]
431 fn test_consistency_checkpoints() {
432 let db = seed_data(90).unwrap();
433
434 let block = 87;
436 save_checkpoint_and_check(&db, StageId::Bodies, block, None);
437 assert_eq!(
438 db.factory
439 .static_file_provider()
440 .get_highest_static_file_block(StaticFileSegment::Transactions),
441 Some(block)
442 );
443 assert_eq!(
444 db.factory
445 .static_file_provider()
446 .get_highest_static_file_tx(StaticFileSegment::Transactions),
447 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
448 );
449
450 let block = 86;
451 save_checkpoint_and_check(&db, StageId::Execution, block, None);
452 assert_eq!(
453 db.factory
454 .static_file_provider()
455 .get_highest_static_file_block(StaticFileSegment::Receipts),
456 Some(block)
457 );
458 assert_eq!(
459 db.factory
460 .static_file_provider()
461 .get_highest_static_file_tx(StaticFileSegment::Receipts),
462 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
463 );
464
465 let block = 80;
466 save_checkpoint_and_check(&db, StageId::Headers, block, None);
467 assert_eq!(
468 db.factory
469 .static_file_provider()
470 .get_highest_static_file_block(StaticFileSegment::Headers),
471 Some(block)
472 );
473
474 save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
476 }
477
478 #[test]
479 fn test_consistency_headers_gap() {
480 let db = seed_data(90).unwrap();
481 let current = db
482 .factory
483 .static_file_provider()
484 .get_highest_static_file_block(StaticFileSegment::Headers)
485 .unwrap();
486
487 update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
489
490 update_db_and_check::<tables::Headers>(&db, current + 1, None);
492 }
493
494 #[test]
495 fn test_consistency_tx_gap() {
496 let db = seed_data(90).unwrap();
497 let current = db
498 .factory
499 .static_file_provider()
500 .get_highest_static_file_tx(StaticFileSegment::Transactions)
501 .unwrap();
502
503 update_db_with_and_check::<tables::Transactions>(
505 &db,
506 current + 2,
507 Some(PipelineTarget::Unwind(89)),
508 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
509 );
510
511 update_db_with_and_check::<tables::Transactions>(
513 &db,
514 current + 1,
515 None,
516 &TxLegacy::default().into_signed(Signature::test_signature()).into(),
517 );
518 }
519
520 #[test]
521 fn test_consistency_receipt_gap() {
522 let db = seed_data(90).unwrap();
523 let current = db
524 .factory
525 .static_file_provider()
526 .get_highest_static_file_tx(StaticFileSegment::Receipts)
527 .unwrap();
528
529 update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
531
532 update_db_and_check::<tables::Receipts>(&db, current + 1, None);
534 }
535}