1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW},
7 table::Value,
8 tables,
9 transaction::DbTxMut,
10 RawKey, RawValue,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15 BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
16 StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21 UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36 chunk_size: u64,
39 etl_config: EtlConfig,
40 prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44 fn default() -> Self {
45 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46 }
47}
48
49impl TransactionLookupStage {
50 pub const fn new(
52 config: TransactionLookupConfig,
53 etl_config: EtlConfig,
54 prune_mode: Option<PruneMode>,
55 ) -> Self {
56 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57 }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62 Provider: DBProvider<Tx: DbTxMut>
63 + PruneCheckpointWriter
64 + BlockReader
65 + PruneCheckpointReader
66 + StatsReader
67 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68 + TransactionsProviderExt,
69{
70 fn id(&self) -> StageId {
72 StageId::TransactionLookup
73 }
74
75 fn execute(
77 &mut self,
78 provider: &Provider,
79 mut input: ExecInput,
80 ) -> Result<ExecOutput, StageError> {
81 if let Some((target_prunable_block, prune_mode)) = self
82 .prune_mode
83 .map(|mode| {
84 mode.prune_target_block(
85 input.target(),
86 PruneSegment::TransactionLookup,
87 PrunePurpose::User,
88 )
89 })
90 .transpose()?
91 .flatten() &&
92 target_prunable_block > input.checkpoint().block_number
93 {
94 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
95
96 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
99 let target_prunable_tx_number = provider
100 .block_body_indices(target_prunable_block)?
101 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
102 .last_tx_num();
103
104 provider.save_prune_checkpoint(
105 PruneSegment::TransactionLookup,
106 PruneCheckpoint {
107 block_number: Some(target_prunable_block),
108 tx_number: Some(target_prunable_tx_number),
109 prune_mode,
110 },
111 )?;
112 }
113 }
114 if input.target_reached() {
115 return Ok(ExecOutput::done(input.checkpoint()));
116 }
117
118 let mut hash_collector: Collector<TxHash, TxNumber> =
120 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
121
122 info!(
123 target: "sync::stages::transaction_lookup",
124 tx_range = ?input.checkpoint().block_number..=input.target(),
125 "Updating transaction lookup"
126 );
127
128 loop {
129 let (tx_range, block_range, is_final_range) =
130 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
131
132 let end_block = *block_range.end();
133
134 info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
135
136 for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
137 hash_collector.insert(key, value)?;
138 }
139
140 input.checkpoint = Some(
141 StageCheckpoint::new(end_block)
142 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
143 );
144
145 if is_final_range {
146 let append_only =
147 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
148 let mut txhash_cursor = provider
149 .tx_ref()
150 .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
151
152 let total_hashes = hash_collector.len();
153 let interval = (total_hashes / 10).max(1);
154 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
155 let (hash, number) = hash_to_number?;
156 if index > 0 && index.is_multiple_of(interval) {
157 info!(
158 target: "sync::stages::transaction_lookup",
159 ?append_only,
160 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
161 "Inserting hashes"
162 );
163 }
164
165 let key = RawKey::<TxHash>::from_vec(hash);
166 if append_only {
167 txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
168 } else {
169 txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
170 }
171 }
172
173 trace!(target: "sync::stages::transaction_lookup",
174 total_hashes,
175 "Transaction hashes inserted"
176 );
177
178 break;
179 }
180 }
181
182 Ok(ExecOutput {
183 checkpoint: StageCheckpoint::new(input.target())
184 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
185 done: true,
186 })
187 }
188
189 fn unwind(
191 &mut self,
192 provider: &Provider,
193 input: UnwindInput,
194 ) -> Result<UnwindOutput, StageError> {
195 let tx = provider.tx_ref();
196 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
197
198 let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
200 let static_file_provider = provider.static_file_provider();
201 let rev_walker = provider
202 .block_body_indices_range(range.clone())?
203 .into_iter()
204 .zip(range.collect::<Vec<_>>())
205 .rev();
206
207 for (body, number) in rev_walker {
208 if number <= unwind_to {
209 break;
210 }
211
212 for tx_id in body.tx_num_range() {
214 if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
216 tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
217 {
218 tx_hash_number_cursor.delete_current()?;
219 }
220 }
221 }
222
223 Ok(UnwindOutput {
224 checkpoint: StageCheckpoint::new(unwind_to)
225 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
226 })
227 }
228}
229
230fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
231where
232 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
233{
234 let pruned_entries = provider
235 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
236 .and_then(|checkpoint| checkpoint.tx_number)
237 .map(|tx_number| tx_number + 1)
239 .unwrap_or_default();
240 Ok(EntitiesCheckpoint {
241 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
245 pruned_entries,
246 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
250 })
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use crate::test_utils::{
257 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
258 TestRunnerError, TestStageDB, UnwindStageTestRunner,
259 };
260 use alloy_primitives::{BlockNumber, B256};
261 use assert_matches::assert_matches;
262 use reth_db_api::transaction::DbTx;
263 use reth_ethereum_primitives::Block;
264 use reth_primitives_traits::SealedBlock;
265 use reth_provider::{
266 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
267 };
268 use reth_stages_api::StageUnitCheckpoint;
269 use reth_testing_utils::generators::{
270 self, random_block, random_block_range, BlockParams, BlockRangeParams,
271 };
272 use std::ops::Sub;
273
274 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
276
277 #[tokio::test]
278 async fn execute_single_transaction_lookup() {
279 let (previous_stage, stage_progress) = (500, 100);
280 let mut rng = generators::rng();
281
282 let runner = TransactionLookupTestRunner::default();
284 let input = ExecInput {
285 target: Some(previous_stage),
286 checkpoint: Some(StageCheckpoint::new(stage_progress)),
287 };
288
289 let non_empty_block_number = stage_progress + 10;
291 let blocks = (stage_progress..=input.target())
292 .map(|number| {
293 random_block(
294 &mut rng,
295 number,
296 BlockParams {
297 tx_count: Some((number == non_empty_block_number) as u8),
298 ..Default::default()
299 },
300 )
301 })
302 .collect::<Vec<_>>();
303 runner
304 .db
305 .insert_blocks(blocks.iter(), StorageKind::Static)
306 .expect("failed to insert blocks");
307
308 let rx = runner.execute(input);
309
310 let result = rx.await.unwrap();
312 assert_matches!(
313 result,
314 Ok(ExecOutput {
315 checkpoint: StageCheckpoint {
316 block_number,
317 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
318 processed,
319 total
320 }))
321 }, done: true }) if block_number == previous_stage && processed == total &&
322 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
323 );
324
325 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
327 }
328
329 #[tokio::test]
330 async fn execute_pruned_transaction_lookup() {
331 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
332 let mut rng = generators::rng();
333
334 let mut runner = TransactionLookupTestRunner::default();
336 let input = ExecInput {
337 target: Some(previous_stage),
338 checkpoint: Some(StageCheckpoint::new(stage_progress)),
339 };
340
341 let seed = random_block_range(
343 &mut rng,
344 stage_progress + 1..=previous_stage,
345 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
346 );
347 runner
348 .db
349 .insert_blocks(seed.iter(), StorageKind::Static)
350 .expect("failed to seed execution");
351
352 runner.set_prune_mode(PruneMode::Before(prune_target));
353
354 let rx = runner.execute(input);
355
356 let result = rx.await.unwrap();
358 assert_matches!(
359 result,
360 Ok(ExecOutput {
361 checkpoint: StageCheckpoint {
362 block_number,
363 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
364 processed,
365 total
366 }))
367 }, done: true }) if block_number == previous_stage && processed == total &&
368 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
369 );
370
371 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
373 }
374
375 #[test]
376 fn stage_checkpoint_pruned() {
377 let db = TestStageDB::default();
378 let mut rng = generators::rng();
379
380 let blocks = random_block_range(
381 &mut rng,
382 0..=100,
383 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
384 );
385 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
386
387 let max_pruned_block = 30;
388 let max_processed_block = 70;
389
390 let mut tx_hash_numbers = Vec::new();
391 let mut tx_hash_number = 0;
392 for block in &blocks[..=max_processed_block] {
393 for transaction in &block.body().transactions {
394 if block.number > max_pruned_block {
395 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
396 }
397 tx_hash_number += 1;
398 }
399 }
400 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
401
402 let provider = db.factory.provider_rw().unwrap();
403 provider
404 .save_prune_checkpoint(
405 PruneSegment::TransactionLookup,
406 PruneCheckpoint {
407 block_number: Some(max_pruned_block),
408 tx_number: Some(
409 blocks[..=max_pruned_block as usize]
410 .iter()
411 .map(|block| block.transaction_count() as u64)
412 .sum::<u64>()
413 .sub(1), ),
415 prune_mode: PruneMode::Full,
416 },
417 )
418 .expect("save stage checkpoint");
419 provider.commit().expect("commit");
420
421 let provider = db.factory.database_provider_rw().unwrap();
422 assert_eq!(
423 stage_checkpoint(&provider).expect("stage checkpoint"),
424 EntitiesCheckpoint {
425 processed: blocks[..=max_processed_block]
426 .iter()
427 .map(|block| block.transaction_count() as u64)
428 .sum(),
429 total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
430 }
431 );
432 }
433
434 struct TransactionLookupTestRunner {
435 db: TestStageDB,
436 chunk_size: u64,
437 etl_config: EtlConfig,
438 prune_mode: Option<PruneMode>,
439 }
440
441 impl Default for TransactionLookupTestRunner {
442 fn default() -> Self {
443 Self {
444 db: TestStageDB::default(),
445 chunk_size: 1000,
446 etl_config: EtlConfig::default(),
447 prune_mode: None,
448 }
449 }
450 }
451
452 impl TransactionLookupTestRunner {
453 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
454 self.prune_mode = Some(prune_mode);
455 }
456
457 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
464 let body_result = self
465 .db
466 .factory
467 .provider_rw()?
468 .block_body_indices(number)?
469 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
470 match body_result {
471 Ok(body) => {
472 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
473 body.last_tx_num(),
474 |key| key,
475 )?
476 }
477 Err(_) => {
478 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
479 }
480 };
481
482 Ok(())
483 }
484 }
485
486 impl StageTestRunner for TransactionLookupTestRunner {
487 type S = TransactionLookupStage;
488
489 fn db(&self) -> &TestStageDB {
490 &self.db
491 }
492
493 fn stage(&self) -> Self::S {
494 TransactionLookupStage {
495 chunk_size: self.chunk_size,
496 etl_config: self.etl_config.clone(),
497 prune_mode: self.prune_mode,
498 }
499 }
500 }
501
502 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
503 type Seed = Vec<SealedBlock<Block>>;
504
505 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
506 let stage_progress = input.checkpoint().block_number;
507 let end = input.target();
508 let mut rng = generators::rng();
509
510 let blocks = random_block_range(
511 &mut rng,
512 stage_progress + 1..=end,
513 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
514 );
515 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
516 Ok(blocks)
517 }
518
519 fn validate_execution(
520 &self,
521 mut input: ExecInput,
522 output: Option<ExecOutput>,
523 ) -> Result<(), TestRunnerError> {
524 match output {
525 Some(output) => {
526 let provider = self.db.factory.provider()?;
527
528 if let Some((target_prunable_block, _)) = self
529 .prune_mode
530 .map(|mode| {
531 mode.prune_target_block(
532 input.target(),
533 PruneSegment::TransactionLookup,
534 PrunePurpose::User,
535 )
536 })
537 .transpose()
538 .expect("prune target block for transaction lookup")
539 .flatten() &&
540 target_prunable_block > input.checkpoint().block_number
541 {
542 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
543 }
544 let start_block = input.next_block();
545 let end_block = output.checkpoint.block_number;
546
547 if start_block > end_block {
548 return Ok(())
549 }
550
551 let mut body_cursor =
552 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
553 body_cursor.seek_exact(start_block)?;
554
555 while let Some((_, body)) = body_cursor.next()? {
556 for tx_id in body.tx_num_range() {
557 let transaction =
558 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
559 assert_eq!(
560 Some(tx_id),
561 provider.transaction_id(*transaction.tx_hash())?
562 );
563 }
564 }
565 }
566 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
567 };
568 Ok(())
569 }
570 }
571
572 impl UnwindStageTestRunner for TransactionLookupTestRunner {
573 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
574 self.ensure_no_hash_by_block(input.unwind_to)
575 }
576 }
577}