1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6 table::{Decode, Decompress, Value},
7 tables,
8 transaction::DbTxMut,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::{NodePrimitives, SignedTransaction};
12use reth_provider::{
13 BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
14 RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
15 TransactionsProvider, TransactionsProviderExt,
16};
17use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
18use reth_stages_api::{
19 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
20 UnwindInput, UnwindOutput,
21};
22use reth_storage_errors::provider::ProviderError;
23use tracing::*;
24
25#[derive(Debug, Clone)]
34pub struct TransactionLookupStage {
35 chunk_size: u64,
38 etl_config: EtlConfig,
39 prune_mode: Option<PruneMode>,
40}
41
42impl Default for TransactionLookupStage {
43 fn default() -> Self {
44 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
45 }
46}
47
48impl TransactionLookupStage {
49 pub const fn new(
51 config: TransactionLookupConfig,
52 etl_config: EtlConfig,
53 prune_mode: Option<PruneMode>,
54 ) -> Self {
55 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
56 }
57}
58
59impl<Provider> Stage<Provider> for TransactionLookupStage
60where
61 Provider: DBProvider<Tx: DbTxMut>
62 + PruneCheckpointWriter
63 + BlockReader
64 + PruneCheckpointReader
65 + StatsReader
66 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
67 + TransactionsProviderExt
68 + StorageSettingsCache
69 + RocksDBProviderFactory,
70{
71 fn id(&self) -> StageId {
73 StageId::TransactionLookup
74 }
75
76 fn execute(
78 &mut self,
79 provider: &Provider,
80 mut input: ExecInput,
81 ) -> Result<ExecOutput, StageError> {
82 if let Some((target_prunable_block, prune_mode)) = self
83 .prune_mode
84 .map(|mode| {
85 mode.prune_target_block(
86 input.target(),
87 PruneSegment::TransactionLookup,
88 PrunePurpose::User,
89 )
90 })
91 .transpose()?
92 .flatten() &&
93 target_prunable_block > input.checkpoint().block_number
94 {
95 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
96
97 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
100 let target_prunable_tx_number = provider
101 .block_body_indices(target_prunable_block)?
102 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
103 .last_tx_num();
104
105 provider.save_prune_checkpoint(
106 PruneSegment::TransactionLookup,
107 PruneCheckpoint {
108 block_number: Some(target_prunable_block),
109 tx_number: Some(target_prunable_tx_number),
110 prune_mode,
111 },
112 )?;
113 }
114 }
115 if input.target_reached() {
116 return Ok(ExecOutput::done(input.checkpoint()));
117 }
118
119 let mut hash_collector: Collector<TxHash, TxNumber> =
121 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
122
123 info!(
124 target: "sync::stages::transaction_lookup",
125 tx_range = ?input.checkpoint().block_number..=input.target(),
126 "Updating transaction lookup"
127 );
128
129 loop {
130 let Some(range_output) =
131 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
132 else {
133 input.checkpoint = Some(
134 StageCheckpoint::new(input.target())
135 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
136 );
137 break;
138 };
139
140 let end_block = *range_output.block_range.end();
141
142 info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
143
144 for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
145 hash_collector.insert(key, value)?;
146 }
147
148 input.checkpoint = Some(
149 StageCheckpoint::new(end_block)
150 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
151 );
152
153 if range_output.is_final_range {
154 let total_hashes = hash_collector.len();
155 let interval = (total_hashes / 10).max(1);
156
157 let append_only =
159 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
160
161 #[cfg(all(unix, feature = "rocksdb"))]
163 let rocksdb = provider.rocksdb_provider();
164 #[cfg(all(unix, feature = "rocksdb"))]
165 let rocksdb_batch = rocksdb.batch();
166 #[cfg(not(all(unix, feature = "rocksdb")))]
167 let rocksdb_batch = ();
168
169 let mut writer =
171 EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
172
173 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
174 let (hash_bytes, number_bytes) = hash_to_number?;
175 if index > 0 && index.is_multiple_of(interval) {
176 info!(
177 target: "sync::stages::transaction_lookup",
178 ?append_only,
179 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
180 "Inserting hashes"
181 );
182 }
183
184 let hash = TxHash::decode(&hash_bytes)?;
186 let tx_num = TxNumber::decompress(&number_bytes)?;
187 writer.put_transaction_hash_number(hash, tx_num, append_only)?;
188 }
189
190 #[cfg(all(unix, feature = "rocksdb"))]
192 if let Some(batch) = writer.into_raw_rocksdb_batch() {
193 provider.set_pending_rocksdb_batch(batch);
194 }
195
196 trace!(target: "sync::stages::transaction_lookup",
197 total_hashes,
198 "Transaction hashes inserted"
199 );
200
201 break;
202 }
203 }
204
205 Ok(ExecOutput {
206 checkpoint: StageCheckpoint::new(input.target())
207 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
208 done: true,
209 })
210 }
211
212 fn unwind(
214 &mut self,
215 provider: &Provider,
216 input: UnwindInput,
217 ) -> Result<UnwindOutput, StageError> {
218 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
219
220 #[cfg(all(unix, feature = "rocksdb"))]
222 let rocksdb = provider.rocksdb_provider();
223 #[cfg(all(unix, feature = "rocksdb"))]
224 let rocksdb_batch = rocksdb.batch();
225 #[cfg(not(all(unix, feature = "rocksdb")))]
226 let rocksdb_batch = ();
227
228 let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
230
231 let static_file_provider = provider.static_file_provider();
232 let rev_walker = provider
233 .block_body_indices_range(range.clone())?
234 .into_iter()
235 .zip(range.collect::<Vec<_>>())
236 .rev();
237
238 for (body, number) in rev_walker {
239 if number <= unwind_to {
240 break;
241 }
242
243 for tx_id in body.tx_num_range() {
245 if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
246 writer.delete_transaction_hash_number(transaction.trie_hash())?;
247 }
248 }
249 }
250
251 #[cfg(all(unix, feature = "rocksdb"))]
253 if let Some(batch) = writer.into_raw_rocksdb_batch() {
254 provider.set_pending_rocksdb_batch(batch);
255 }
256
257 Ok(UnwindOutput {
258 checkpoint: StageCheckpoint::new(unwind_to)
259 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
260 })
261 }
262}
263
264fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
265where
266 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
267{
268 let pruned_entries = provider
269 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
270 .and_then(|checkpoint| checkpoint.tx_number)
271 .map(|tx_number| tx_number + 1)
273 .unwrap_or_default();
274 Ok(EntitiesCheckpoint {
275 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
279 pruned_entries,
280 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
284 })
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::test_utils::{
291 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
292 TestRunnerError, TestStageDB, UnwindStageTestRunner,
293 };
294 use alloy_primitives::{BlockNumber, B256};
295 use assert_matches::assert_matches;
296 use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
297 use reth_ethereum_primitives::Block;
298 use reth_primitives_traits::SealedBlock;
299 use reth_provider::{
300 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
301 };
302 use reth_stages_api::StageUnitCheckpoint;
303 use reth_testing_utils::generators::{
304 self, random_block, random_block_range, BlockParams, BlockRangeParams,
305 };
306 use std::ops::Sub;
307
308 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
310
311 #[tokio::test]
312 async fn execute_single_transaction_lookup() {
313 let (previous_stage, stage_progress) = (500, 100);
314 let mut rng = generators::rng();
315
316 let runner = TransactionLookupTestRunner::default();
318 let input = ExecInput {
319 target: Some(previous_stage),
320 checkpoint: Some(StageCheckpoint::new(stage_progress)),
321 };
322
323 let non_empty_block_number = stage_progress + 10;
325 let blocks = (stage_progress..=input.target())
326 .map(|number| {
327 random_block(
328 &mut rng,
329 number,
330 BlockParams {
331 tx_count: Some((number == non_empty_block_number) as u8),
332 ..Default::default()
333 },
334 )
335 })
336 .collect::<Vec<_>>();
337 runner
338 .db
339 .insert_blocks(blocks.iter(), StorageKind::Static)
340 .expect("failed to insert blocks");
341
342 let rx = runner.execute(input);
343
344 let result = rx.await.unwrap();
346 assert_matches!(
347 result,
348 Ok(ExecOutput {
349 checkpoint: StageCheckpoint {
350 block_number,
351 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
352 processed,
353 total
354 }))
355 }, done: true }) if block_number == previous_stage && processed == total &&
356 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
357 );
358
359 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
361 }
362
363 #[tokio::test]
364 async fn execute_pruned_transaction_lookup() {
365 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
366 let mut rng = generators::rng();
367
368 let mut runner = TransactionLookupTestRunner::default();
370 let input = ExecInput {
371 target: Some(previous_stage),
372 checkpoint: Some(StageCheckpoint::new(stage_progress)),
373 };
374
375 let seed = random_block_range(
377 &mut rng,
378 stage_progress + 1..=previous_stage,
379 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
380 );
381 runner
382 .db
383 .insert_blocks(seed.iter(), StorageKind::Static)
384 .expect("failed to seed execution");
385
386 runner.set_prune_mode(PruneMode::Before(prune_target));
387
388 let rx = runner.execute(input);
389
390 let result = rx.await.unwrap();
392 assert_matches!(
393 result,
394 Ok(ExecOutput {
395 checkpoint: StageCheckpoint {
396 block_number,
397 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
398 processed,
399 total
400 }))
401 }, done: true }) if block_number == previous_stage && processed == total &&
402 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
403 );
404
405 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
407 }
408
409 #[test]
410 fn stage_checkpoint_pruned() {
411 let db = TestStageDB::default();
412 let mut rng = generators::rng();
413
414 let blocks = random_block_range(
415 &mut rng,
416 0..=100,
417 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
418 );
419 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
420
421 let max_pruned_block = 30;
422 let max_processed_block = 70;
423
424 let mut tx_hash_numbers = Vec::new();
425 let mut tx_hash_number = 0;
426 for block in &blocks[..=max_processed_block] {
427 for transaction in &block.body().transactions {
428 if block.number > max_pruned_block {
429 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
430 }
431 tx_hash_number += 1;
432 }
433 }
434 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
435
436 let provider = db.factory.provider_rw().unwrap();
437 provider
438 .save_prune_checkpoint(
439 PruneSegment::TransactionLookup,
440 PruneCheckpoint {
441 block_number: Some(max_pruned_block),
442 tx_number: Some(
443 blocks[..=max_pruned_block as usize]
444 .iter()
445 .map(|block| block.transaction_count() as u64)
446 .sum::<u64>()
447 .sub(1), ),
449 prune_mode: PruneMode::Full,
450 },
451 )
452 .expect("save stage checkpoint");
453 provider.commit().expect("commit");
454
455 let provider = db.factory.database_provider_rw().unwrap();
456 assert_eq!(
457 stage_checkpoint(&provider).expect("stage checkpoint"),
458 EntitiesCheckpoint {
459 processed: blocks[..=max_processed_block]
460 .iter()
461 .map(|block| block.transaction_count() as u64)
462 .sum(),
463 total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
464 }
465 );
466 }
467
468 struct TransactionLookupTestRunner {
469 db: TestStageDB,
470 chunk_size: u64,
471 etl_config: EtlConfig,
472 prune_mode: Option<PruneMode>,
473 }
474
475 impl Default for TransactionLookupTestRunner {
476 fn default() -> Self {
477 Self {
478 db: TestStageDB::default(),
479 chunk_size: 1000,
480 etl_config: EtlConfig::default(),
481 prune_mode: None,
482 }
483 }
484 }
485
486 impl TransactionLookupTestRunner {
487 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
488 self.prune_mode = Some(prune_mode);
489 }
490
491 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
498 let body_result = self
499 .db
500 .factory
501 .provider_rw()?
502 .block_body_indices(number)?
503 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
504 match body_result {
505 Ok(body) => {
506 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
507 body.last_tx_num(),
508 |key| key,
509 )?
510 }
511 Err(_) => {
512 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
513 }
514 };
515
516 Ok(())
517 }
518 }
519
520 impl StageTestRunner for TransactionLookupTestRunner {
521 type S = TransactionLookupStage;
522
523 fn db(&self) -> &TestStageDB {
524 &self.db
525 }
526
527 fn stage(&self) -> Self::S {
528 TransactionLookupStage {
529 chunk_size: self.chunk_size,
530 etl_config: self.etl_config.clone(),
531 prune_mode: self.prune_mode,
532 }
533 }
534 }
535
536 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
537 type Seed = Vec<SealedBlock<Block>>;
538
539 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
540 let stage_progress = input.checkpoint().block_number;
541 let end = input.target();
542 let mut rng = generators::rng();
543
544 let blocks = random_block_range(
545 &mut rng,
546 stage_progress + 1..=end,
547 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
548 );
549 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
550 Ok(blocks)
551 }
552
553 fn validate_execution(
554 &self,
555 mut input: ExecInput,
556 output: Option<ExecOutput>,
557 ) -> Result<(), TestRunnerError> {
558 match output {
559 Some(output) => {
560 let provider = self.db.factory.provider()?;
561
562 if let Some((target_prunable_block, _)) = self
563 .prune_mode
564 .map(|mode| {
565 mode.prune_target_block(
566 input.target(),
567 PruneSegment::TransactionLookup,
568 PrunePurpose::User,
569 )
570 })
571 .transpose()
572 .expect("prune target block for transaction lookup")
573 .flatten() &&
574 target_prunable_block > input.checkpoint().block_number
575 {
576 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
577 }
578 let start_block = input.next_block();
579 let end_block = output.checkpoint.block_number;
580
581 if start_block > end_block {
582 return Ok(())
583 }
584
585 let mut body_cursor =
586 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
587 body_cursor.seek_exact(start_block)?;
588
589 while let Some((_, body)) = body_cursor.next()? {
590 for tx_id in body.tx_num_range() {
591 let transaction =
592 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
593 assert_eq!(
594 Some(tx_id),
595 provider.transaction_id(*transaction.tx_hash())?
596 );
597 }
598 }
599 }
600 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
601 };
602 Ok(())
603 }
604 }
605
606 impl UnwindStageTestRunner for TransactionLookupTestRunner {
607 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
608 self.ensure_no_hash_by_block(input.unwind_to)
609 }
610 }
611
612 #[cfg(all(unix, feature = "rocksdb"))]
613 mod rocksdb_tests {
614 use super::*;
615 use reth_provider::RocksDBProviderFactory;
616 use reth_storage_api::StorageSettings;
617
618 #[tokio::test]
621 async fn execute_writes_to_rocksdb_when_enabled() {
622 let (previous_stage, stage_progress) = (110, 100);
623 let mut rng = generators::rng();
624
625 let runner = TransactionLookupTestRunner::default();
627
628 runner.db.factory.set_storage_settings_cache(
630 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
631 );
632
633 let input = ExecInput {
634 target: Some(previous_stage),
635 checkpoint: Some(StageCheckpoint::new(stage_progress)),
636 };
637
638 let blocks = random_block_range(
640 &mut rng,
641 stage_progress + 1..=previous_stage,
642 BlockRangeParams {
643 parent: Some(B256::ZERO),
644 tx_count: 1..3, ..Default::default()
646 },
647 );
648 runner
649 .db
650 .insert_blocks(blocks.iter(), StorageKind::Static)
651 .expect("failed to insert blocks");
652
653 let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
655 assert!(expected_tx_count > 0, "test requires at least one transaction");
656
657 let rx = runner.execute(input);
659 let result = rx.await.unwrap();
660 assert!(result.is_ok(), "stage execution failed: {:?}", result);
661
662 let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
664 assert_eq!(
665 mdbx_count, 0,
666 "MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
667 );
668
669 let rocksdb = runner.db.factory.rocksdb_provider();
671 let mut rocksdb_count = 0;
672 for block in &blocks {
673 for tx in &block.body().transactions {
674 let hash = *tx.tx_hash();
675 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
676 assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
677 rocksdb_count += 1;
678 }
679 }
680 assert_eq!(
681 rocksdb_count, expected_tx_count,
682 "RocksDB should contain all transaction hashes"
683 );
684 }
685
686 #[tokio::test]
689 async fn unwind_deletes_from_rocksdb_when_enabled() {
690 let (previous_stage, stage_progress) = (110, 100);
691 let mut rng = generators::rng();
692
693 let runner = TransactionLookupTestRunner::default();
695
696 runner.db.factory.set_storage_settings_cache(
698 StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
699 );
700
701 let blocks = random_block_range(
703 &mut rng,
704 stage_progress + 1..=previous_stage,
705 BlockRangeParams {
706 parent: Some(B256::ZERO),
707 tx_count: 1..3, ..Default::default()
709 },
710 );
711 runner
712 .db
713 .insert_blocks(blocks.iter(), StorageKind::Static)
714 .expect("failed to insert blocks");
715
716 let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
718 assert!(expected_tx_count > 0, "test requires at least one transaction");
719
720 let exec_input = ExecInput {
722 target: Some(previous_stage),
723 checkpoint: Some(StageCheckpoint::new(stage_progress)),
724 };
725 let rx = runner.execute(exec_input);
726 let result = rx.await.unwrap();
727 assert!(result.is_ok(), "stage execution failed: {:?}", result);
728
729 let rocksdb = runner.db.factory.rocksdb_provider();
731 for block in &blocks {
732 for tx in &block.body().transactions {
733 let hash = *tx.tx_hash();
734 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
735 assert!(
736 result.is_some(),
737 "Transaction hash {:?} should exist before unwind",
738 hash
739 );
740 }
741 }
742
743 let unwind_input = UnwindInput {
745 checkpoint: StageCheckpoint::new(previous_stage),
746 unwind_to: stage_progress,
747 bad_block: None,
748 };
749 let unwind_result = runner.unwind(unwind_input).await;
750 assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
751
752 let rocksdb = runner.db.factory.rocksdb_provider();
754 for block in &blocks {
755 for tx in &block.body().transactions {
756 let hash = *tx.tx_hash();
757 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
758 assert!(
759 result.is_none(),
760 "Transaction hash {:?} should be deleted from RocksDB after unwind",
761 hash
762 );
763 }
764 }
765 }
766 }
767}