1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5#[cfg(all(unix, feature = "rocksdb"))]
6use reth_db_api::Tables;
7use reth_db_api::{
8 table::{Decode, Decompress, Value},
9 tables,
10 transaction::DbTxMut,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15 BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
16 RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
17 TransactionsProvider, TransactionsProviderExt,
18};
19use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
20use reth_stages_api::{
21 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
22 UnwindInput, UnwindOutput,
23};
24use reth_storage_errors::provider::ProviderError;
25use tracing::*;
26
27#[derive(Debug, Clone)]
36pub struct TransactionLookupStage {
37 chunk_size: u64,
40 etl_config: EtlConfig,
41 prune_mode: Option<PruneMode>,
42}
43
44impl Default for TransactionLookupStage {
45 fn default() -> Self {
46 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
47 }
48}
49
50impl TransactionLookupStage {
51 pub const fn new(
53 config: TransactionLookupConfig,
54 etl_config: EtlConfig,
55 prune_mode: Option<PruneMode>,
56 ) -> Self {
57 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
58 }
59}
60
61impl<Provider> Stage<Provider> for TransactionLookupStage
62where
63 Provider: DBProvider<Tx: DbTxMut>
64 + PruneCheckpointWriter
65 + BlockReader
66 + PruneCheckpointReader
67 + StatsReader
68 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
69 + TransactionsProviderExt
70 + StorageSettingsCache
71 + RocksDBProviderFactory,
72{
73 fn id(&self) -> StageId {
75 StageId::TransactionLookup
76 }
77
78 fn execute(
80 &mut self,
81 provider: &Provider,
82 mut input: ExecInput,
83 ) -> Result<ExecOutput, StageError> {
84 if let Some((target_prunable_block, prune_mode)) = self
85 .prune_mode
86 .map(|mode| {
87 mode.prune_target_block(
88 input.target(),
89 PruneSegment::TransactionLookup,
90 PrunePurpose::User,
91 )
92 })
93 .transpose()?
94 .flatten() &&
95 target_prunable_block > input.checkpoint().block_number
96 {
97 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
98
99 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
102 let target_prunable_tx_number = provider
103 .block_body_indices(target_prunable_block)?
104 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
105 .last_tx_num();
106
107 provider.save_prune_checkpoint(
108 PruneSegment::TransactionLookup,
109 PruneCheckpoint {
110 block_number: Some(target_prunable_block),
111 tx_number: Some(target_prunable_tx_number),
112 prune_mode,
113 },
114 )?;
115 }
116 }
117 if input.target_reached() {
118 return Ok(ExecOutput::done(input.checkpoint()));
119 }
120
121 let mut hash_collector: Collector<TxHash, TxNumber> =
123 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
124
125 info!(
126 target: "sync::stages::transaction_lookup",
127 tx_range = ?input.checkpoint().block_number..=input.target(),
128 "Updating transaction lookup"
129 );
130
131 loop {
132 let Some(range_output) =
133 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
134 else {
135 input.checkpoint = Some(
136 StageCheckpoint::new(input.target())
137 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
138 );
139 break;
140 };
141
142 let end_block = *range_output.block_range.end();
143
144 info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
145
146 for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
147 hash_collector.insert(key, value)?;
148 }
149
150 input.checkpoint = Some(
151 StageCheckpoint::new(end_block)
152 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
153 );
154
155 if range_output.is_final_range {
156 let total_hashes = hash_collector.len();
157 let interval = (total_hashes / 10).max(1);
158
159 let append_only =
161 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
162
163 provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
165 let mut writer =
166 EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
167
168 let iter = hash_collector
169 .iter()
170 .map_err(|e| ProviderError::other(Box::new(e)))?;
171
172 for (index, hash_to_number) in iter.enumerate() {
173 let (hash_bytes, number_bytes) =
174 hash_to_number.map_err(|e| ProviderError::other(Box::new(e)))?;
175 if index > 0 && index.is_multiple_of(interval) {
176 info!(
177 target: "sync::stages::transaction_lookup",
178 ?append_only,
179 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
180 "Inserting hashes"
181 );
182 }
183
184 let hash = TxHash::decode(&hash_bytes)?;
186 let tx_num = TxNumber::decompress(&number_bytes)?;
187 writer.put_transaction_hash_number(hash, tx_num, append_only)?;
188 }
189
190 Ok(((), writer.into_raw_rocksdb_batch()))
191 })?;
192
193 trace!(target: "sync::stages::transaction_lookup",
194 total_hashes,
195 "Transaction hashes inserted"
196 );
197
198 break;
199 }
200 }
201
202 #[cfg(all(unix, feature = "rocksdb"))]
203 if provider.cached_storage_settings().storage_v2 {
204 provider.commit_pending_rocksdb_batches()?;
205 provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
206 }
207
208 Ok(ExecOutput {
209 checkpoint: StageCheckpoint::new(input.target())
210 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
211 done: true,
212 })
213 }
214
215 fn unwind(
217 &mut self,
218 provider: &Provider,
219 input: UnwindInput,
220 ) -> Result<UnwindOutput, StageError> {
221 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
222
223 provider.with_rocksdb_batch(|rocksdb_batch| {
224 let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
225
226 let static_file_provider = provider.static_file_provider();
227 let rev_walker = provider
228 .block_body_indices_range(range.clone())?
229 .into_iter()
230 .rev()
231 .zip(range.rev());
232
233 for (body, number) in rev_walker {
234 if number <= unwind_to {
235 break;
236 }
237
238 for transaction in
240 static_file_provider.transactions_by_tx_range(body.tx_num_range())?
241 {
242 writer.delete_transaction_hash_number(transaction.trie_hash())?;
243 }
244 }
245
246 Ok(((), writer.into_raw_rocksdb_batch()))
247 })?;
248
249 Ok(UnwindOutput {
250 checkpoint: StageCheckpoint::new(unwind_to)
251 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
252 })
253 }
254}
255
256fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
257where
258 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
259{
260 let pruned_entries = provider
261 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
262 .and_then(|checkpoint| checkpoint.tx_number)
263 .map(|tx_number| tx_number + 1)
265 .unwrap_or_default();
266 Ok(EntitiesCheckpoint {
267 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
271 pruned_entries,
272 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
276 })
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::test_utils::{
283 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
284 TestRunnerError, TestStageDB, UnwindStageTestRunner,
285 };
286 use alloy_primitives::{BlockNumber, B256};
287 use assert_matches::assert_matches;
288 use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
289 use reth_ethereum_primitives::Block;
290 use reth_primitives_traits::SealedBlock;
291 use reth_provider::{
292 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
293 };
294 use reth_stages_api::StageUnitCheckpoint;
295 use reth_testing_utils::generators::{
296 self, random_block, random_block_range, BlockParams, BlockRangeParams,
297 };
298 use std::ops::Sub;
299
300 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
302
303 #[tokio::test]
304 async fn execute_single_transaction_lookup() {
305 let (previous_stage, stage_progress) = (500, 100);
306 let mut rng = generators::rng();
307
308 let runner = TransactionLookupTestRunner::default();
310 let input = ExecInput {
311 target: Some(previous_stage),
312 checkpoint: Some(StageCheckpoint::new(stage_progress)),
313 };
314
315 let non_empty_block_number = stage_progress + 10;
317 let blocks = (stage_progress..=input.target())
318 .map(|number| {
319 random_block(
320 &mut rng,
321 number,
322 BlockParams {
323 tx_count: Some((number == non_empty_block_number) as u8),
324 ..Default::default()
325 },
326 )
327 })
328 .collect::<Vec<_>>();
329 runner
330 .db
331 .insert_blocks(blocks.iter(), StorageKind::Static)
332 .expect("failed to insert blocks");
333
334 let rx = runner.execute(input);
335
336 let result = rx.await.unwrap();
338 assert_matches!(
339 result,
340 Ok(ExecOutput {
341 checkpoint: StageCheckpoint {
342 block_number,
343 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
344 processed,
345 total
346 }))
347 }, done: true }) if block_number == previous_stage && processed == total &&
348 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
349 );
350
351 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
353 }
354
355 #[tokio::test]
356 async fn execute_pruned_transaction_lookup() {
357 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
358 let mut rng = generators::rng();
359
360 let mut runner = TransactionLookupTestRunner::default();
362 let input = ExecInput {
363 target: Some(previous_stage),
364 checkpoint: Some(StageCheckpoint::new(stage_progress)),
365 };
366
367 let seed = random_block_range(
369 &mut rng,
370 stage_progress + 1..=previous_stage,
371 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
372 );
373 runner
374 .db
375 .insert_blocks(seed.iter(), StorageKind::Static)
376 .expect("failed to seed execution");
377
378 runner.set_prune_mode(PruneMode::Before(prune_target));
379
380 let rx = runner.execute(input);
381
382 let result = rx.await.unwrap();
384 assert_matches!(
385 result,
386 Ok(ExecOutput {
387 checkpoint: StageCheckpoint {
388 block_number,
389 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
390 processed,
391 total
392 }))
393 }, done: true }) if block_number == previous_stage && processed == total &&
394 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
395 );
396
397 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
399 }
400
401 #[test]
402 fn stage_checkpoint_pruned() {
403 let db = TestStageDB::default();
404 let mut rng = generators::rng();
405
406 let blocks = random_block_range(
407 &mut rng,
408 0..=100,
409 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
410 );
411 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
412
413 let max_pruned_block = 30;
414 let max_processed_block = 70;
415
416 let mut tx_hash_numbers = Vec::new();
417 let mut tx_hash_number = 0;
418 for block in &blocks[..=max_processed_block] {
419 for transaction in &block.body().transactions {
420 if block.number > max_pruned_block {
421 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
422 }
423 tx_hash_number += 1;
424 }
425 }
426 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
427
428 let provider = db.factory.provider_rw().unwrap();
429 provider
430 .save_prune_checkpoint(
431 PruneSegment::TransactionLookup,
432 PruneCheckpoint {
433 block_number: Some(max_pruned_block),
434 tx_number: Some(
435 blocks[..=max_pruned_block as usize]
436 .iter()
437 .map(|block| block.transaction_count() as u64)
438 .sum::<u64>()
439 .sub(1), ),
441 prune_mode: PruneMode::Full,
442 },
443 )
444 .expect("save stage checkpoint");
445 provider.commit().expect("commit");
446
447 let provider = db.factory.database_provider_rw().unwrap();
448 assert_eq!(
449 stage_checkpoint(&provider).expect("stage checkpoint"),
450 EntitiesCheckpoint {
451 processed: blocks[..=max_processed_block]
452 .iter()
453 .map(|block| block.transaction_count() as u64)
454 .sum(),
455 total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
456 }
457 );
458 }
459
460 struct TransactionLookupTestRunner {
461 db: TestStageDB,
462 chunk_size: u64,
463 etl_config: EtlConfig,
464 prune_mode: Option<PruneMode>,
465 }
466
467 impl Default for TransactionLookupTestRunner {
468 fn default() -> Self {
469 Self {
470 db: TestStageDB::default(),
471 chunk_size: 1000,
472 etl_config: EtlConfig::default(),
473 prune_mode: None,
474 }
475 }
476 }
477
478 impl TransactionLookupTestRunner {
479 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
480 self.prune_mode = Some(prune_mode);
481 }
482
483 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
490 let body_result = self
491 .db
492 .factory
493 .provider_rw()?
494 .block_body_indices(number)?
495 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
496 match body_result {
497 Ok(body) => {
498 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
499 body.last_tx_num(),
500 |key| key,
501 )?
502 }
503 Err(_) => {
504 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
505 }
506 };
507
508 Ok(())
509 }
510 }
511
512 impl StageTestRunner for TransactionLookupTestRunner {
513 type S = TransactionLookupStage;
514
515 fn db(&self) -> &TestStageDB {
516 &self.db
517 }
518
519 fn stage(&self) -> Self::S {
520 TransactionLookupStage {
521 chunk_size: self.chunk_size,
522 etl_config: self.etl_config.clone(),
523 prune_mode: self.prune_mode,
524 }
525 }
526 }
527
528 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
529 type Seed = Vec<SealedBlock<Block>>;
530
531 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
532 let stage_progress = input.checkpoint().block_number;
533 let end = input.target();
534 let mut rng = generators::rng();
535
536 let blocks = random_block_range(
537 &mut rng,
538 stage_progress + 1..=end,
539 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
540 );
541 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
542 Ok(blocks)
543 }
544
545 fn validate_execution(
546 &self,
547 mut input: ExecInput,
548 output: Option<ExecOutput>,
549 ) -> Result<(), TestRunnerError> {
550 match output {
551 Some(output) => {
552 let provider = self.db.factory.provider()?;
553
554 if let Some((target_prunable_block, _)) = self
555 .prune_mode
556 .map(|mode| {
557 mode.prune_target_block(
558 input.target(),
559 PruneSegment::TransactionLookup,
560 PrunePurpose::User,
561 )
562 })
563 .transpose()
564 .expect("prune target block for transaction lookup")
565 .flatten() &&
566 target_prunable_block > input.checkpoint().block_number
567 {
568 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
569 }
570 let start_block = input.next_block();
571 let end_block = output.checkpoint.block_number;
572
573 if start_block > end_block {
574 return Ok(())
575 }
576
577 let mut body_cursor =
578 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
579 body_cursor.seek_exact(start_block)?;
580
581 while let Some((_, body)) = body_cursor.next()? {
582 for tx_id in body.tx_num_range() {
583 let transaction =
584 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
585 assert_eq!(
586 Some(tx_id),
587 provider.transaction_id(*transaction.tx_hash())?
588 );
589 }
590 }
591 }
592 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
593 };
594 Ok(())
595 }
596 }
597
598 impl UnwindStageTestRunner for TransactionLookupTestRunner {
599 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
600 self.ensure_no_hash_by_block(input.unwind_to)
601 }
602 }
603
604 #[cfg(all(unix, feature = "rocksdb"))]
605 mod rocksdb_tests {
606 use super::*;
607 use reth_provider::RocksDBProviderFactory;
608 use reth_storage_api::StorageSettings;
609
610 #[tokio::test]
613 async fn execute_writes_to_rocksdb_when_enabled() {
614 let (previous_stage, stage_progress) = (110, 100);
615 let mut rng = generators::rng();
616
617 let runner = TransactionLookupTestRunner::default();
619
620 runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
622
623 let input = ExecInput {
624 target: Some(previous_stage),
625 checkpoint: Some(StageCheckpoint::new(stage_progress)),
626 };
627
628 let blocks = random_block_range(
630 &mut rng,
631 stage_progress + 1..=previous_stage,
632 BlockRangeParams {
633 parent: Some(B256::ZERO),
634 tx_count: 1..3, ..Default::default()
636 },
637 );
638 runner
639 .db
640 .insert_blocks(blocks.iter(), StorageKind::Static)
641 .expect("failed to insert blocks");
642
643 let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
645 assert!(expected_tx_count > 0, "test requires at least one transaction");
646
647 let rx = runner.execute(input);
649 let result = rx.await.unwrap();
650 assert!(result.is_ok(), "stage execution failed: {:?}", result);
651
652 let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
654 assert_eq!(
655 mdbx_count, 0,
656 "MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
657 );
658
659 let rocksdb = runner.db.factory.rocksdb_provider();
661 let mut rocksdb_count = 0;
662 for block in &blocks {
663 for tx in &block.body().transactions {
664 let hash = *tx.tx_hash();
665 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
666 assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
667 rocksdb_count += 1;
668 }
669 }
670 assert_eq!(
671 rocksdb_count, expected_tx_count,
672 "RocksDB should contain all transaction hashes"
673 );
674 }
675
676 #[tokio::test]
679 async fn unwind_deletes_from_rocksdb_when_enabled() {
680 let (previous_stage, stage_progress) = (110, 100);
681 let mut rng = generators::rng();
682
683 let runner = TransactionLookupTestRunner::default();
685
686 runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
688
689 let blocks = random_block_range(
691 &mut rng,
692 stage_progress + 1..=previous_stage,
693 BlockRangeParams {
694 parent: Some(B256::ZERO),
695 tx_count: 1..3, ..Default::default()
697 },
698 );
699 runner
700 .db
701 .insert_blocks(blocks.iter(), StorageKind::Static)
702 .expect("failed to insert blocks");
703
704 let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
706 assert!(expected_tx_count > 0, "test requires at least one transaction");
707
708 let exec_input = ExecInput {
710 target: Some(previous_stage),
711 checkpoint: Some(StageCheckpoint::new(stage_progress)),
712 };
713 let rx = runner.execute(exec_input);
714 let result = rx.await.unwrap();
715 assert!(result.is_ok(), "stage execution failed: {:?}", result);
716
717 let rocksdb = runner.db.factory.rocksdb_provider();
719 for block in &blocks {
720 for tx in &block.body().transactions {
721 let hash = *tx.tx_hash();
722 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
723 assert!(
724 result.is_some(),
725 "Transaction hash {:?} should exist before unwind",
726 hash
727 );
728 }
729 }
730
731 let unwind_input = UnwindInput {
733 checkpoint: StageCheckpoint::new(previous_stage),
734 unwind_to: stage_progress,
735 bad_block: None,
736 };
737 let unwind_result = runner.unwind(unwind_input).await;
738 assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
739
740 let rocksdb = runner.db.factory.rocksdb_provider();
742 for block in &blocks {
743 for tx in &block.body().transactions {
744 let hash = *tx.tx_hash();
745 let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
746 assert!(
747 result.is_none(),
748 "Transaction hash {:?} should be deleted from RocksDB after unwind",
749 hash
750 );
751 }
752 }
753 }
754 }
755}