1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW},
7 table::Value,
8 tables,
9 transaction::DbTxMut,
10 RawKey, RawValue,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15 BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
16 StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21 UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36 chunk_size: u64,
39 etl_config: EtlConfig,
40 prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44 fn default() -> Self {
45 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46 }
47}
48
49impl TransactionLookupStage {
50 pub const fn new(
52 config: TransactionLookupConfig,
53 etl_config: EtlConfig,
54 prune_mode: Option<PruneMode>,
55 ) -> Self {
56 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57 }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62 Provider: DBProvider<Tx: DbTxMut>
63 + PruneCheckpointWriter
64 + BlockReader
65 + PruneCheckpointReader
66 + StatsReader
67 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68 + TransactionsProviderExt,
69{
70 fn id(&self) -> StageId {
72 StageId::TransactionLookup
73 }
74
75 fn execute(
77 &mut self,
78 provider: &Provider,
79 mut input: ExecInput,
80 ) -> Result<ExecOutput, StageError> {
81 if let Some((target_prunable_block, prune_mode)) = self
82 .prune_mode
83 .map(|mode| {
84 mode.prune_target_block(
85 input.target(),
86 PruneSegment::TransactionLookup,
87 PrunePurpose::User,
88 )
89 })
90 .transpose()?
91 .flatten()
92 {
93 if target_prunable_block > input.checkpoint().block_number {
94 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
95
96 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
99 let target_prunable_tx_number = provider
100 .block_body_indices(target_prunable_block)?
101 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
102 .last_tx_num();
103
104 provider.save_prune_checkpoint(
105 PruneSegment::TransactionLookup,
106 PruneCheckpoint {
107 block_number: Some(target_prunable_block),
108 tx_number: Some(target_prunable_tx_number),
109 prune_mode,
110 },
111 )?;
112 }
113 }
114 }
115 if input.target_reached() {
116 return Ok(ExecOutput::done(input.checkpoint()));
117 }
118
119 let mut hash_collector: Collector<TxHash, TxNumber> =
121 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
122
123 info!(
124 target: "sync::stages::transaction_lookup",
125 tx_range = ?input.checkpoint().block_number..=input.target(),
126 "Updating transaction lookup"
127 );
128
129 loop {
130 let (tx_range, block_range, is_final_range) =
131 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
132
133 let end_block = *block_range.end();
134
135 info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
136
137 for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
138 hash_collector.insert(key, value)?;
139 }
140
141 input.checkpoint = Some(
142 StageCheckpoint::new(end_block)
143 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
144 );
145
146 if is_final_range {
147 let append_only =
148 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
149 let mut txhash_cursor = provider
150 .tx_ref()
151 .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
152
153 let total_hashes = hash_collector.len();
154 let interval = (total_hashes / 10).max(1);
155 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
156 let (hash, number) = hash_to_number?;
157 if index > 0 && index % interval == 0 {
158 info!(
159 target: "sync::stages::transaction_lookup",
160 ?append_only,
161 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
162 "Inserting hashes"
163 );
164 }
165
166 let key = RawKey::<TxHash>::from_vec(hash);
167 if append_only {
168 txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
169 } else {
170 txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
171 }
172 }
173
174 trace!(target: "sync::stages::transaction_lookup",
175 total_hashes,
176 "Transaction hashes inserted"
177 );
178
179 break;
180 }
181 }
182
183 Ok(ExecOutput {
184 checkpoint: StageCheckpoint::new(input.target())
185 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
186 done: true,
187 })
188 }
189
190 fn unwind(
192 &mut self,
193 provider: &Provider,
194 input: UnwindInput,
195 ) -> Result<UnwindOutput, StageError> {
196 let tx = provider.tx_ref();
197 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
198
199 let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
201 let static_file_provider = provider.static_file_provider();
202 let rev_walker = provider
203 .block_body_indices_range(range.clone())?
204 .into_iter()
205 .zip(range.collect::<Vec<_>>())
206 .rev();
207
208 for (body, number) in rev_walker {
209 if number <= unwind_to {
210 break;
211 }
212
213 for tx_id in body.tx_num_range() {
215 if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
217 if tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() {
218 tx_hash_number_cursor.delete_current()?;
219 }
220 }
221 }
222 }
223
224 Ok(UnwindOutput {
225 checkpoint: StageCheckpoint::new(unwind_to)
226 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
227 })
228 }
229}
230
231fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
232where
233 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
234{
235 let pruned_entries = provider
236 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
237 .and_then(|checkpoint| checkpoint.tx_number)
238 .map(|tx_number| tx_number + 1)
240 .unwrap_or_default();
241 Ok(EntitiesCheckpoint {
242 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
246 pruned_entries,
247 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
251 })
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::test_utils::{
258 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
259 TestRunnerError, TestStageDB, UnwindStageTestRunner,
260 };
261 use alloy_primitives::{BlockNumber, B256};
262 use assert_matches::assert_matches;
263 use reth_db_api::transaction::DbTx;
264 use reth_ethereum_primitives::Block;
265 use reth_primitives_traits::SealedBlock;
266 use reth_provider::{
267 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
268 StaticFileProviderFactory,
269 };
270 use reth_stages_api::StageUnitCheckpoint;
271 use reth_testing_utils::generators::{
272 self, random_block, random_block_range, BlockParams, BlockRangeParams,
273 };
274 use std::ops::Sub;
275
276 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
278
279 #[tokio::test]
280 async fn execute_single_transaction_lookup() {
281 let (previous_stage, stage_progress) = (500, 100);
282 let mut rng = generators::rng();
283
284 let runner = TransactionLookupTestRunner::default();
286 let input = ExecInput {
287 target: Some(previous_stage),
288 checkpoint: Some(StageCheckpoint::new(stage_progress)),
289 };
290
291 let non_empty_block_number = stage_progress + 10;
293 let blocks = (stage_progress..=input.target())
294 .map(|number| {
295 random_block(
296 &mut rng,
297 number,
298 BlockParams {
299 tx_count: Some((number == non_empty_block_number) as u8),
300 ..Default::default()
301 },
302 )
303 })
304 .collect::<Vec<_>>();
305 runner
306 .db
307 .insert_blocks(blocks.iter(), StorageKind::Static)
308 .expect("failed to insert blocks");
309
310 let rx = runner.execute(input);
311
312 let result = rx.await.unwrap();
314 assert_matches!(
315 result,
316 Ok(ExecOutput {
317 checkpoint: StageCheckpoint {
318 block_number,
319 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
320 processed,
321 total
322 }))
323 }, done: true }) if block_number == previous_stage && processed == total &&
324 total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
325 );
326
327 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
329 }
330
331 #[tokio::test]
332 async fn execute_pruned_transaction_lookup() {
333 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
334 let mut rng = generators::rng();
335
336 let mut runner = TransactionLookupTestRunner::default();
338 let input = ExecInput {
339 target: Some(previous_stage),
340 checkpoint: Some(StageCheckpoint::new(stage_progress)),
341 };
342
343 let seed = random_block_range(
345 &mut rng,
346 stage_progress + 1..=previous_stage,
347 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
348 );
349 runner
350 .db
351 .insert_blocks(seed.iter(), StorageKind::Static)
352 .expect("failed to seed execution");
353
354 runner.set_prune_mode(PruneMode::Before(prune_target));
355
356 let rx = runner.execute(input);
357
358 let result = rx.await.unwrap();
360 assert_matches!(
361 result,
362 Ok(ExecOutput {
363 checkpoint: StageCheckpoint {
364 block_number,
365 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
366 processed,
367 total
368 }))
369 }, done: true }) if block_number == previous_stage && processed == total &&
370 total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
371 );
372
373 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
375 }
376
377 #[test]
378 fn stage_checkpoint_pruned() {
379 let db = TestStageDB::default();
380 let mut rng = generators::rng();
381
382 let blocks = random_block_range(
383 &mut rng,
384 0..=100,
385 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
386 );
387 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
388
389 let max_pruned_block = 30;
390 let max_processed_block = 70;
391
392 let mut tx_hash_numbers = Vec::new();
393 let mut tx_hash_number = 0;
394 for block in &blocks[..=max_processed_block] {
395 for transaction in &block.body().transactions {
396 if block.number > max_pruned_block {
397 tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
398 }
399 tx_hash_number += 1;
400 }
401 }
402 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
403
404 let provider = db.factory.provider_rw().unwrap();
405 provider
406 .save_prune_checkpoint(
407 PruneSegment::TransactionLookup,
408 PruneCheckpoint {
409 block_number: Some(max_pruned_block),
410 tx_number: Some(
411 blocks[..=max_pruned_block as usize]
412 .iter()
413 .map(|block| block.transaction_count() as u64)
414 .sum::<u64>()
415 .sub(1), ),
417 prune_mode: PruneMode::Full,
418 },
419 )
420 .expect("save stage checkpoint");
421 provider.commit().expect("commit");
422
423 let provider = db.factory.database_provider_rw().unwrap();
424 assert_eq!(
425 stage_checkpoint(&provider).expect("stage checkpoint"),
426 EntitiesCheckpoint {
427 processed: blocks[..=max_processed_block]
428 .iter()
429 .map(|block| block.transaction_count() as u64)
430 .sum(),
431 total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
432 }
433 );
434 }
435
436 struct TransactionLookupTestRunner {
437 db: TestStageDB,
438 chunk_size: u64,
439 etl_config: EtlConfig,
440 prune_mode: Option<PruneMode>,
441 }
442
443 impl Default for TransactionLookupTestRunner {
444 fn default() -> Self {
445 Self {
446 db: TestStageDB::default(),
447 chunk_size: 1000,
448 etl_config: EtlConfig::default(),
449 prune_mode: None,
450 }
451 }
452 }
453
454 impl TransactionLookupTestRunner {
455 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
456 self.prune_mode = Some(prune_mode);
457 }
458
459 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
466 let body_result = self
467 .db
468 .factory
469 .provider_rw()?
470 .block_body_indices(number)?
471 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
472 match body_result {
473 Ok(body) => {
474 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
475 body.last_tx_num(),
476 |key| key,
477 )?
478 }
479 Err(_) => {
480 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
481 }
482 };
483
484 Ok(())
485 }
486 }
487
488 impl StageTestRunner for TransactionLookupTestRunner {
489 type S = TransactionLookupStage;
490
491 fn db(&self) -> &TestStageDB {
492 &self.db
493 }
494
495 fn stage(&self) -> Self::S {
496 TransactionLookupStage {
497 chunk_size: self.chunk_size,
498 etl_config: self.etl_config.clone(),
499 prune_mode: self.prune_mode,
500 }
501 }
502 }
503
504 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
505 type Seed = Vec<SealedBlock<Block>>;
506
507 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
508 let stage_progress = input.checkpoint().block_number;
509 let end = input.target();
510 let mut rng = generators::rng();
511
512 let blocks = random_block_range(
513 &mut rng,
514 stage_progress + 1..=end,
515 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
516 );
517 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
518 Ok(blocks)
519 }
520
521 fn validate_execution(
522 &self,
523 mut input: ExecInput,
524 output: Option<ExecOutput>,
525 ) -> Result<(), TestRunnerError> {
526 match output {
527 Some(output) => {
528 let provider = self.db.factory.provider()?;
529
530 if let Some((target_prunable_block, _)) = self
531 .prune_mode
532 .map(|mode| {
533 mode.prune_target_block(
534 input.target(),
535 PruneSegment::TransactionLookup,
536 PrunePurpose::User,
537 )
538 })
539 .transpose()
540 .expect("prune target block for transaction lookup")
541 .flatten()
542 {
543 if target_prunable_block > input.checkpoint().block_number {
544 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
545 }
546 }
547 let start_block = input.next_block();
548 let end_block = output.checkpoint.block_number;
549
550 if start_block > end_block {
551 return Ok(())
552 }
553
554 let mut body_cursor =
555 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
556 body_cursor.seek_exact(start_block)?;
557
558 while let Some((_, body)) = body_cursor.next()? {
559 for tx_id in body.tx_num_range() {
560 let transaction =
561 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
562 assert_eq!(
563 Some(tx_id),
564 provider.transaction_id(*transaction.tx_hash())?
565 );
566 }
567 }
568 }
569 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
570 };
571 Ok(())
572 }
573 }
574
575 impl UnwindStageTestRunner for TransactionLookupTestRunner {
576 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
577 self.ensure_no_hash_by_block(input.unwind_to)
578 }
579 }
580}