1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6 table::{Decode, Decompress, Value},
7 tables,
8 transaction::DbTxMut,
9 Tables,
10};
11use reth_etl::Collector;
12use reth_primitives_traits::{NodePrimitives, SignedTransaction};
13use reth_provider::{
14 BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
15 RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
16 TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21 UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36 chunk_size: u64,
39 etl_config: EtlConfig,
40 prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44 fn default() -> Self {
45 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46 }
47}
48
49impl TransactionLookupStage {
50 pub const fn new(
52 config: TransactionLookupConfig,
53 etl_config: EtlConfig,
54 prune_mode: Option<PruneMode>,
55 ) -> Self {
56 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57 }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62 Provider: DBProvider<Tx: DbTxMut>
63 + PruneCheckpointWriter
64 + BlockReader
65 + PruneCheckpointReader
66 + StatsReader
67 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68 + TransactionsProviderExt
69 + StorageSettingsCache
70 + RocksDBProviderFactory,
71{
72 fn id(&self) -> StageId {
74 StageId::TransactionLookup
75 }
76
77 fn execute(
79 &mut self,
80 provider: &Provider,
81 mut input: ExecInput,
82 ) -> Result<ExecOutput, StageError> {
83 if let Some((target_prunable_block, prune_mode)) = self
84 .prune_mode
85 .map(|mode| {
86 mode.prune_target_block(
87 input.target(),
88 PruneSegment::TransactionLookup,
89 PrunePurpose::User,
90 )
91 })
92 .transpose()?
93 .flatten() &&
94 target_prunable_block > input.checkpoint().block_number
95 {
96 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
97
98 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
101 let target_prunable_tx_number = provider
102 .block_body_indices(target_prunable_block)?
103 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
104 .last_tx_num();
105
106 provider.save_prune_checkpoint(
107 PruneSegment::TransactionLookup,
108 PruneCheckpoint {
109 block_number: Some(target_prunable_block),
110 tx_number: Some(target_prunable_tx_number),
111 prune_mode,
112 },
113 )?;
114 }
115 }
116 if input.target_reached() {
117 return Ok(ExecOutput::done(input.checkpoint()));
118 }
119
120 let mut hash_collector: Collector<TxHash, TxNumber> =
122 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
123
124 info!(
125 target: "sync::stages::transaction_lookup",
126 tx_range = ?input.checkpoint().block_number..=input.target(),
127 "Updating transaction lookup"
128 );
129
130 loop {
131 let Some(range_output) =
132 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
133 else {
134 input.checkpoint = Some(
135 StageCheckpoint::new(input.target())
136 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
137 );
138 break;
139 };
140
141 let end_block = *range_output.block_range.end();
142
143 info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
144
145 for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
146 hash_collector.insert(key, value)?;
147 }
148
149 input.checkpoint = Some(
150 StageCheckpoint::new(end_block)
151 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
152 );
153
154 if range_output.is_final_range {
155 let total_hashes = hash_collector.len();
156 let interval = (total_hashes / 10).max(1);
157
158 let append_only =
160 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
161
162 provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
164 let mut writer =
165 EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
166
167 let iter = hash_collector
168 .iter()
169 .map_err(|e| ProviderError::other(Box::new(e)))?;
170
171 for (index, hash_to_number) in iter.enumerate() {
172 let (hash_bytes, number_bytes) =
173 hash_to_number.map_err(|e| ProviderError::other(Box::new(e)))?;
174 if index > 0 && index.is_multiple_of(interval) {
175 info!(
176 target: "sync::stages::transaction_lookup",
177 ?append_only,
178 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
179 "Inserting hashes"
180 );
181 }
182
183 let hash = TxHash::decode(&hash_bytes)?;
185 let tx_num = TxNumber::decompress(&number_bytes)?;
186 writer.put_transaction_hash_number(hash, tx_num, append_only)?;
187 }
188
189 Ok(((), writer.into_raw_rocksdb_batch()))
190 })?;
191
192 trace!(target: "sync::stages::transaction_lookup",
193 total_hashes,
194 "Transaction hashes inserted"
195 );
196
197 break;
198 }
199 }
200
201 if provider.cached_storage_settings().storage_v2 {
202 provider.commit_pending_rocksdb_batches()?;
203 provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
204 }
205
206 Ok(ExecOutput {
207 checkpoint: StageCheckpoint::new(input.target())
208 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
209 done: true,
210 })
211 }
212
213 fn unwind(
215 &mut self,
216 provider: &Provider,
217 input: UnwindInput,
218 ) -> Result<UnwindOutput, StageError> {
219 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
220
221 provider.with_rocksdb_batch(|rocksdb_batch| {
222 let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
223
224 let static_file_provider = provider.static_file_provider();
225 let rev_walker = provider
226 .block_body_indices_range(range.clone())?
227 .into_iter()
228 .rev()
229 .zip(range.rev());
230
231 for (body, number) in rev_walker {
232 if number <= unwind_to {
233 break;
234 }
235
236 for transaction in
238 static_file_provider.transactions_by_tx_range(body.tx_num_range())?
239 {
240 writer.delete_transaction_hash_number(transaction.trie_hash())?;
241 }
242 }
243
244 Ok(((), writer.into_raw_rocksdb_batch()))
245 })?;
246
247 Ok(UnwindOutput {
248 checkpoint: StageCheckpoint::new(unwind_to)
249 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
250 })
251 }
252}
253
254fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
255where
256 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
257{
258 let pruned_entries = provider
259 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
260 .and_then(|checkpoint| checkpoint.tx_number)
261 .map(|tx_number| tx_number + 1)
263 .unwrap_or_default();
264 Ok(EntitiesCheckpoint {
265 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
269 pruned_entries,
270 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
274 })
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use crate::test_utils::{
281 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
282 TestRunnerError, TestStageDB, UnwindStageTestRunner,
283 };
284 use alloy_primitives::{BlockNumber, B256};
285 use assert_matches::assert_matches;
286 use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
287 use reth_ethereum_primitives::Block;
288 use reth_primitives_traits::SealedBlock;
289 use reth_provider::{
290 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
291 };
292 use reth_stages_api::StageUnitCheckpoint;
293 use reth_testing_utils::generators::{
294 self, random_block, random_block_range, BlockParams, BlockRangeParams,
295 };
296 use std::ops::Sub;
297
298 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
300
301 #[tokio::test]
302 async fn execute_single_transaction_lookup() {
303 let (previous_stage, stage_progress) = (500, 100);
304 let mut rng = generators::rng();
305
306 let runner = TransactionLookupTestRunner::default();
308 let input = ExecInput {
309 target: Some(previous_stage),
310 checkpoint: Some(StageCheckpoint::new(stage_progress)),
311 };
312
313 let non_empty_block_number = stage_progress + 10;
315 let blocks = (stage_progress..=input.target())
316 .map(|number| {
317 random_block(
318 &mut rng,
319 number,
320 BlockParams {
321 tx_count: Some((number == non_empty_block_number) as u8),
322 ..Default::default()
323 },
324 )
325 })
326 .collect::<Vec<_>>();
327 runner
328 .db
329 .insert_blocks(blocks.iter(), StorageKind::Static)
330 .expect("failed to insert blocks");
331
332 let rx = runner.execute(input);
333
334 let result = rx.await.unwrap();
336 assert_matches!(
337 result,
338 Ok(ExecOutput {
339 checkpoint: StageCheckpoint {
340 block_number,
341 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
342 processed,
343 total
344 }))
345 }, done: true }) if block_number == previous_stage && processed == total &&
346 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
347 );
348
349 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
351 }
352
353 #[tokio::test]
354 async fn execute_pruned_transaction_lookup() {
355 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
356 let mut rng = generators::rng();
357
358 let mut runner = TransactionLookupTestRunner::default();
360 let input = ExecInput {
361 target: Some(previous_stage),
362 checkpoint: Some(StageCheckpoint::new(stage_progress)),
363 };
364
365 let seed = random_block_range(
367 &mut rng,
368 stage_progress + 1..=previous_stage,
369 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
370 );
371 runner
372 .db
373 .insert_blocks(seed.iter(), StorageKind::Static)
374 .expect("failed to seed execution");
375
376 runner.set_prune_mode(PruneMode::Before(prune_target));
377
378 let rx = runner.execute(input);
379
380 let result = rx.await.unwrap();
382 assert_matches!(
383 result,
384 Ok(ExecOutput {
385 checkpoint: StageCheckpoint {
386 block_number,
387 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
388 processed,
389 total
390 }))
391 }, done: true }) if block_number == previous_stage && processed == total &&
392 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
393 );
394
395 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
397 }
398
399 #[test]
400 fn stage_checkpoint_pruned() {
401 let db = TestStageDB::default();
402 let mut rng = generators::rng();
403
404 let blocks = random_block_range(
405 &mut rng,
406 0..=100,
407 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
408 );
409 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
410
411 let max_pruned_block = 30;
412 let max_processed_block = 70;
413
414 let mut tx_hash_numbers = Vec::new();
415 let mut tx_hash_number = 0;
416 for block in &blocks[..=max_processed_block] {
417 for transaction in &block.body().transactions {
418 if block.number > max_pruned_block {
419 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
420 }
421 tx_hash_number += 1;
422 }
423 }
424 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
425
426 let provider = db.factory.provider_rw().unwrap();
427 provider
428 .save_prune_checkpoint(
429 PruneSegment::TransactionLookup,
430 PruneCheckpoint {
431 block_number: Some(max_pruned_block),
432 tx_number: Some(
433 blocks[..=max_pruned_block as usize]
434 .iter()
435 .map(|block| block.transaction_count() as u64)
436 .sum::<u64>()
437 .sub(1), ),
439 prune_mode: PruneMode::Full,
440 },
441 )
442 .expect("save stage checkpoint");
443 provider.commit().expect("commit");
444
445 let provider = db.factory.database_provider_rw().unwrap();
446 assert_eq!(
447 stage_checkpoint(&provider).expect("stage checkpoint"),
448 EntitiesCheckpoint {
449 processed: blocks[..=max_processed_block]
450 .iter()
451 .map(|block| block.transaction_count() as u64)
452 .sum(),
453 total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
454 }
455 );
456 }
457
458 struct TransactionLookupTestRunner {
459 db: TestStageDB,
460 chunk_size: u64,
461 etl_config: EtlConfig,
462 prune_mode: Option<PruneMode>,
463 }
464
465 impl Default for TransactionLookupTestRunner {
466 fn default() -> Self {
467 Self {
468 db: TestStageDB::default(),
469 chunk_size: 1000,
470 etl_config: EtlConfig::default(),
471 prune_mode: None,
472 }
473 }
474 }
475
476 impl TransactionLookupTestRunner {
477 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
478 self.prune_mode = Some(prune_mode);
479 }
480
481 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
488 let body_result = self
489 .db
490 .factory
491 .provider_rw()?
492 .block_body_indices(number)?
493 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
494 match body_result {
495 Ok(body) => {
496 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
497 body.last_tx_num(),
498 |key| key,
499 )?
500 }
501 Err(_) => {
502 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
503 }
504 };
505
506 Ok(())
507 }
508 }
509
510 impl StageTestRunner for TransactionLookupTestRunner {
511 type S = TransactionLookupStage;
512
513 fn db(&self) -> &TestStageDB {
514 &self.db
515 }
516
517 fn stage(&self) -> Self::S {
518 TransactionLookupStage {
519 chunk_size: self.chunk_size,
520 etl_config: self.etl_config.clone(),
521 prune_mode: self.prune_mode,
522 }
523 }
524 }
525
526 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
527 type Seed = Vec<SealedBlock<Block>>;
528
529 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
530 let stage_progress = input.checkpoint().block_number;
531 let end = input.target();
532 let mut rng = generators::rng();
533
534 let blocks = random_block_range(
535 &mut rng,
536 stage_progress + 1..=end,
537 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
538 );
539 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
540 Ok(blocks)
541 }
542
543 fn validate_execution(
544 &self,
545 mut input: ExecInput,
546 output: Option<ExecOutput>,
547 ) -> Result<(), TestRunnerError> {
548 match output {
549 Some(output) => {
550 let provider = self.db.factory.provider()?;
551
552 if let Some((target_prunable_block, _)) = self
553 .prune_mode
554 .map(|mode| {
555 mode.prune_target_block(
556 input.target(),
557 PruneSegment::TransactionLookup,
558 PrunePurpose::User,
559 )
560 })
561 .transpose()
562 .expect("prune target block for transaction lookup")
563 .flatten() &&
564 target_prunable_block > input.checkpoint().block_number
565 {
566 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
567 }
568 let start_block = input.next_block();
569 let end_block = output.checkpoint.block_number;
570
571 if start_block > end_block {
572 return Ok(())
573 }
574
575 let mut body_cursor =
576 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
577 body_cursor.seek_exact(start_block)?;
578
579 while let Some((_, body)) = body_cursor.next()? {
580 for tx_id in body.tx_num_range() {
581 let transaction =
582 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
583 assert_eq!(
584 Some(tx_id),
585 provider.transaction_id(*transaction.tx_hash())?
586 );
587 }
588 }
589 }
590 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
591 };
592 Ok(())
593 }
594 }
595
596 impl UnwindStageTestRunner for TransactionLookupTestRunner {
597 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
598 self.ensure_no_hash_by_block(input.unwind_to)
599 }
600 }
601
602 mod rocksdb_tests {
603 use super::*;
604 use reth_provider::RocksDBProviderFactory;
605 use reth_storage_api::StorageSettings;
606
607 #[tokio::test]
610 async fn execute_writes_to_rocksdb_when_enabled() {
611 let (previous_stage, stage_progress) = (110, 100);
612 let mut rng = generators::rng();
613
614 let runner = TransactionLookupTestRunner::default();
616
617 runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
619
620 let input = ExecInput {
621 target: Some(previous_stage),
622 checkpoint: Some(StageCheckpoint::new(stage_progress)),
623 };
624
625 let blocks = random_block_range(
627 &mut rng,
628 stage_progress + 1..=previous_stage,
629 BlockRangeParams {
630 parent: Some(B256::ZERO),
631 tx_count: 1..3, ..Default::default()
633 },
634 );
635 runner
636 .db
637 .insert_blocks(blocks.iter(), StorageKind::Static)
638 .expect("failed to insert blocks");
639
640 let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
642 assert!(expected_tx_count > 0, "test requires at least one transaction");
643
644 let rx = runner.execute(input);
646 let result = rx.await.unwrap();
647 assert!(result.is_ok(), "stage execution failed: {:?}", result);
648
649 let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
651 assert_eq!(
652 mdbx_count, 0,
653 "MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
654 );
655
656 let rocksdb = runner.db.factory.rocksdb_provider();
658 let mut rocksdb_count = 0;
659 for block in &blocks {
660 for tx in &block.body().transactions {
661 let hash = *tx.tx_hash();
662 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
663 assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
664 rocksdb_count += 1;
665 }
666 }
667 assert_eq!(
668 rocksdb_count, expected_tx_count,
669 "RocksDB should contain all transaction hashes"
670 );
671 }
672
673 #[tokio::test]
676 async fn unwind_deletes_from_rocksdb_when_enabled() {
677 let (previous_stage, stage_progress) = (110, 100);
678 let mut rng = generators::rng();
679
680 let runner = TransactionLookupTestRunner::default();
682
683 runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
685
686 let blocks = random_block_range(
688 &mut rng,
689 stage_progress + 1..=previous_stage,
690 BlockRangeParams {
691 parent: Some(B256::ZERO),
692 tx_count: 1..3, ..Default::default()
694 },
695 );
696 runner
697 .db
698 .insert_blocks(blocks.iter(), StorageKind::Static)
699 .expect("failed to insert blocks");
700
701 let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
703 assert!(expected_tx_count > 0, "test requires at least one transaction");
704
705 let exec_input = ExecInput {
707 target: Some(previous_stage),
708 checkpoint: Some(StageCheckpoint::new(stage_progress)),
709 };
710 let rx = runner.execute(exec_input);
711 let result = rx.await.unwrap();
712 assert!(result.is_ok(), "stage execution failed: {:?}", result);
713
714 let rocksdb = runner.db.factory.rocksdb_provider();
716 for block in &blocks {
717 for tx in &block.body().transactions {
718 let hash = *tx.tx_hash();
719 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
720 assert!(
721 result.is_some(),
722 "Transaction hash {:?} should exist before unwind",
723 hash
724 );
725 }
726 }
727
728 let unwind_input = UnwindInput {
730 checkpoint: StageCheckpoint::new(previous_stage),
731 unwind_to: stage_progress,
732 bad_block: None,
733 };
734 let unwind_result = runner.unwind(unwind_input).await;
735 assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
736
737 let rocksdb = runner.db.factory.rocksdb_provider();
739 for block in &blocks {
740 for tx in &block.body().transactions {
741 let hash = *tx.tx_hash();
742 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
743 assert!(
744 result.is_none(),
745 "Transaction hash {:?} should be deleted from RocksDB after unwind",
746 hash
747 );
748 }
749 }
750 }
751 }
752}