1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW},
7 table::Value,
8 tables,
9 transaction::DbTxMut,
10 RawKey, RawValue,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15 BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
16 StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21 UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36 chunk_size: u64,
39 etl_config: EtlConfig,
40 prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44 fn default() -> Self {
45 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46 }
47}
48
49impl TransactionLookupStage {
50 pub const fn new(
52 config: TransactionLookupConfig,
53 etl_config: EtlConfig,
54 prune_mode: Option<PruneMode>,
55 ) -> Self {
56 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57 }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62 Provider: DBProvider<Tx: DbTxMut>
63 + PruneCheckpointWriter
64 + BlockReader
65 + PruneCheckpointReader
66 + StatsReader
67 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68 + TransactionsProviderExt,
69{
70 fn id(&self) -> StageId {
72 StageId::TransactionLookup
73 }
74
75 fn execute(
77 &mut self,
78 provider: &Provider,
79 mut input: ExecInput,
80 ) -> Result<ExecOutput, StageError> {
81 if let Some((target_prunable_block, prune_mode)) = self
82 .prune_mode
83 .map(|mode| {
84 mode.prune_target_block(
85 input.target(),
86 PruneSegment::TransactionLookup,
87 PrunePurpose::User,
88 )
89 })
90 .transpose()?
91 .flatten() &&
92 target_prunable_block > input.checkpoint().block_number
93 {
94 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
95
96 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
99 let target_prunable_tx_number = provider
100 .block_body_indices(target_prunable_block)?
101 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
102 .last_tx_num();
103
104 provider.save_prune_checkpoint(
105 PruneSegment::TransactionLookup,
106 PruneCheckpoint {
107 block_number: Some(target_prunable_block),
108 tx_number: Some(target_prunable_tx_number),
109 prune_mode,
110 },
111 )?;
112 }
113 }
114 if input.target_reached() {
115 return Ok(ExecOutput::done(input.checkpoint()));
116 }
117
118 let mut hash_collector: Collector<TxHash, TxNumber> =
120 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
121
122 info!(
123 target: "sync::stages::transaction_lookup",
124 tx_range = ?input.checkpoint().block_number..=input.target(),
125 "Updating transaction lookup"
126 );
127
128 loop {
129 let Some(range_output) =
130 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
131 else {
132 input.checkpoint = Some(
133 StageCheckpoint::new(input.target())
134 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
135 );
136 break;
137 };
138
139 let end_block = *range_output.block_range.end();
140
141 info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
142
143 for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
144 hash_collector.insert(key, value)?;
145 }
146
147 input.checkpoint = Some(
148 StageCheckpoint::new(end_block)
149 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
150 );
151
152 if range_output.is_final_range {
153 let append_only =
154 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
155 let mut txhash_cursor = provider
156 .tx_ref()
157 .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
158
159 let total_hashes = hash_collector.len();
160 let interval = (total_hashes / 10).max(1);
161 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
162 let (hash, number) = hash_to_number?;
163 if index > 0 && index.is_multiple_of(interval) {
164 info!(
165 target: "sync::stages::transaction_lookup",
166 ?append_only,
167 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
168 "Inserting hashes"
169 );
170 }
171
172 let key = RawKey::<TxHash>::from_vec(hash);
173 if append_only {
174 txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
175 } else {
176 txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
177 }
178 }
179
180 trace!(target: "sync::stages::transaction_lookup",
181 total_hashes,
182 "Transaction hashes inserted"
183 );
184
185 break;
186 }
187 }
188
189 Ok(ExecOutput {
190 checkpoint: StageCheckpoint::new(input.target())
191 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
192 done: true,
193 })
194 }
195
196 fn unwind(
198 &mut self,
199 provider: &Provider,
200 input: UnwindInput,
201 ) -> Result<UnwindOutput, StageError> {
202 let tx = provider.tx_ref();
203 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
204
205 let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
207 let static_file_provider = provider.static_file_provider();
208 let rev_walker = provider
209 .block_body_indices_range(range.clone())?
210 .into_iter()
211 .zip(range.collect::<Vec<_>>())
212 .rev();
213
214 for (body, number) in rev_walker {
215 if number <= unwind_to {
216 break;
217 }
218
219 for tx_id in body.tx_num_range() {
221 if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
223 tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
224 {
225 tx_hash_number_cursor.delete_current()?;
226 }
227 }
228 }
229
230 Ok(UnwindOutput {
231 checkpoint: StageCheckpoint::new(unwind_to)
232 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
233 })
234 }
235}
236
237fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
238where
239 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
240{
241 let pruned_entries = provider
242 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
243 .and_then(|checkpoint| checkpoint.tx_number)
244 .map(|tx_number| tx_number + 1)
246 .unwrap_or_default();
247 Ok(EntitiesCheckpoint {
248 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
252 pruned_entries,
253 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
257 })
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use crate::test_utils::{
264 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
265 TestRunnerError, TestStageDB, UnwindStageTestRunner,
266 };
267 use alloy_primitives::{BlockNumber, B256};
268 use assert_matches::assert_matches;
269 use reth_db_api::transaction::DbTx;
270 use reth_ethereum_primitives::Block;
271 use reth_primitives_traits::SealedBlock;
272 use reth_provider::{
273 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
274 };
275 use reth_stages_api::StageUnitCheckpoint;
276 use reth_testing_utils::generators::{
277 self, random_block, random_block_range, BlockParams, BlockRangeParams,
278 };
279 use std::ops::Sub;
280
281 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
283
284 #[tokio::test]
285 async fn execute_single_transaction_lookup() {
286 let (previous_stage, stage_progress) = (500, 100);
287 let mut rng = generators::rng();
288
289 let runner = TransactionLookupTestRunner::default();
291 let input = ExecInput {
292 target: Some(previous_stage),
293 checkpoint: Some(StageCheckpoint::new(stage_progress)),
294 };
295
296 let non_empty_block_number = stage_progress + 10;
298 let blocks = (stage_progress..=input.target())
299 .map(|number| {
300 random_block(
301 &mut rng,
302 number,
303 BlockParams {
304 tx_count: Some((number == non_empty_block_number) as u8),
305 ..Default::default()
306 },
307 )
308 })
309 .collect::<Vec<_>>();
310 runner
311 .db
312 .insert_blocks(blocks.iter(), StorageKind::Static)
313 .expect("failed to insert blocks");
314
315 let rx = runner.execute(input);
316
317 let result = rx.await.unwrap();
319 assert_matches!(
320 result,
321 Ok(ExecOutput {
322 checkpoint: StageCheckpoint {
323 block_number,
324 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
325 processed,
326 total
327 }))
328 }, done: true }) if block_number == previous_stage && processed == total &&
329 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
330 );
331
332 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
334 }
335
336 #[tokio::test]
337 async fn execute_pruned_transaction_lookup() {
338 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
339 let mut rng = generators::rng();
340
341 let mut runner = TransactionLookupTestRunner::default();
343 let input = ExecInput {
344 target: Some(previous_stage),
345 checkpoint: Some(StageCheckpoint::new(stage_progress)),
346 };
347
348 let seed = random_block_range(
350 &mut rng,
351 stage_progress + 1..=previous_stage,
352 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
353 );
354 runner
355 .db
356 .insert_blocks(seed.iter(), StorageKind::Static)
357 .expect("failed to seed execution");
358
359 runner.set_prune_mode(PruneMode::Before(prune_target));
360
361 let rx = runner.execute(input);
362
363 let result = rx.await.unwrap();
365 assert_matches!(
366 result,
367 Ok(ExecOutput {
368 checkpoint: StageCheckpoint {
369 block_number,
370 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
371 processed,
372 total
373 }))
374 }, done: true }) if block_number == previous_stage && processed == total &&
375 total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
376 );
377
378 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
380 }
381
382 #[test]
383 fn stage_checkpoint_pruned() {
384 let db = TestStageDB::default();
385 let mut rng = generators::rng();
386
387 let blocks = random_block_range(
388 &mut rng,
389 0..=100,
390 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
391 );
392 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
393
394 let max_pruned_block = 30;
395 let max_processed_block = 70;
396
397 let mut tx_hash_numbers = Vec::new();
398 let mut tx_hash_number = 0;
399 for block in &blocks[..=max_processed_block] {
400 for transaction in &block.body().transactions {
401 if block.number > max_pruned_block {
402 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
403 }
404 tx_hash_number += 1;
405 }
406 }
407 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
408
409 let provider = db.factory.provider_rw().unwrap();
410 provider
411 .save_prune_checkpoint(
412 PruneSegment::TransactionLookup,
413 PruneCheckpoint {
414 block_number: Some(max_pruned_block),
415 tx_number: Some(
416 blocks[..=max_pruned_block as usize]
417 .iter()
418 .map(|block| block.transaction_count() as u64)
419 .sum::<u64>()
420 .sub(1), ),
422 prune_mode: PruneMode::Full,
423 },
424 )
425 .expect("save stage checkpoint");
426 provider.commit().expect("commit");
427
428 let provider = db.factory.database_provider_rw().unwrap();
429 assert_eq!(
430 stage_checkpoint(&provider).expect("stage checkpoint"),
431 EntitiesCheckpoint {
432 processed: blocks[..=max_processed_block]
433 .iter()
434 .map(|block| block.transaction_count() as u64)
435 .sum(),
436 total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
437 }
438 );
439 }
440
441 struct TransactionLookupTestRunner {
442 db: TestStageDB,
443 chunk_size: u64,
444 etl_config: EtlConfig,
445 prune_mode: Option<PruneMode>,
446 }
447
448 impl Default for TransactionLookupTestRunner {
449 fn default() -> Self {
450 Self {
451 db: TestStageDB::default(),
452 chunk_size: 1000,
453 etl_config: EtlConfig::default(),
454 prune_mode: None,
455 }
456 }
457 }
458
459 impl TransactionLookupTestRunner {
460 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
461 self.prune_mode = Some(prune_mode);
462 }
463
464 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
471 let body_result = self
472 .db
473 .factory
474 .provider_rw()?
475 .block_body_indices(number)?
476 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
477 match body_result {
478 Ok(body) => {
479 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
480 body.last_tx_num(),
481 |key| key,
482 )?
483 }
484 Err(_) => {
485 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
486 }
487 };
488
489 Ok(())
490 }
491 }
492
493 impl StageTestRunner for TransactionLookupTestRunner {
494 type S = TransactionLookupStage;
495
496 fn db(&self) -> &TestStageDB {
497 &self.db
498 }
499
500 fn stage(&self) -> Self::S {
501 TransactionLookupStage {
502 chunk_size: self.chunk_size,
503 etl_config: self.etl_config.clone(),
504 prune_mode: self.prune_mode,
505 }
506 }
507 }
508
509 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
510 type Seed = Vec<SealedBlock<Block>>;
511
512 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
513 let stage_progress = input.checkpoint().block_number;
514 let end = input.target();
515 let mut rng = generators::rng();
516
517 let blocks = random_block_range(
518 &mut rng,
519 stage_progress + 1..=end,
520 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
521 );
522 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
523 Ok(blocks)
524 }
525
526 fn validate_execution(
527 &self,
528 mut input: ExecInput,
529 output: Option<ExecOutput>,
530 ) -> Result<(), TestRunnerError> {
531 match output {
532 Some(output) => {
533 let provider = self.db.factory.provider()?;
534
535 if let Some((target_prunable_block, _)) = self
536 .prune_mode
537 .map(|mode| {
538 mode.prune_target_block(
539 input.target(),
540 PruneSegment::TransactionLookup,
541 PrunePurpose::User,
542 )
543 })
544 .transpose()
545 .expect("prune target block for transaction lookup")
546 .flatten() &&
547 target_prunable_block > input.checkpoint().block_number
548 {
549 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
550 }
551 let start_block = input.next_block();
552 let end_block = output.checkpoint.block_number;
553
554 if start_block > end_block {
555 return Ok(())
556 }
557
558 let mut body_cursor =
559 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
560 body_cursor.seek_exact(start_block)?;
561
562 while let Some((_, body)) = body_cursor.next()? {
563 for tx_id in body.tx_num_range() {
564 let transaction =
565 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
566 assert_eq!(
567 Some(tx_id),
568 provider.transaction_id(*transaction.tx_hash())?
569 );
570 }
571 }
572 }
573 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
574 };
575 Ok(())
576 }
577 }
578
579 impl UnwindStageTestRunner for TransactionLookupTestRunner {
580 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
581 self.ensure_no_hash_by_block(input.unwind_to)
582 }
583 }
584}