reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW},
7    table::Value,
8    tables,
9    transaction::DbTxMut,
10    RawKey, RawValue,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15    BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
16    StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21    UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26/// The transaction lookup stage.
27///
28/// This stage walks over existing transactions, and sets the transaction hash of each transaction
29/// in a block to the corresponding `BlockNumber` at each block. This is written to the
30/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
31/// hash.
32///
33/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
34#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36    /// The maximum number of lookup entries to hold in memory before pushing them to
37    /// [`reth_etl::Collector`].
38    chunk_size: u64,
39    etl_config: EtlConfig,
40    prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44    fn default() -> Self {
45        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46    }
47}
48
49impl TransactionLookupStage {
50    /// Create new instance of [`TransactionLookupStage`].
51    pub const fn new(
52        config: TransactionLookupConfig,
53        etl_config: EtlConfig,
54        prune_mode: Option<PruneMode>,
55    ) -> Self {
56        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57    }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62    Provider: DBProvider<Tx: DbTxMut>
63        + PruneCheckpointWriter
64        + BlockReader
65        + PruneCheckpointReader
66        + StatsReader
67        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68        + TransactionsProviderExt,
69{
70    /// Return the id of the stage
71    fn id(&self) -> StageId {
72        StageId::TransactionLookup
73    }
74
75    /// Write transaction hash -> id entries
76    fn execute(
77        &mut self,
78        provider: &Provider,
79        mut input: ExecInput,
80    ) -> Result<ExecOutput, StageError> {
81        if let Some((target_prunable_block, prune_mode)) = self
82            .prune_mode
83            .map(|mode| {
84                mode.prune_target_block(
85                    input.target(),
86                    PruneSegment::TransactionLookup,
87                    PrunePurpose::User,
88                )
89            })
90            .transpose()?
91            .flatten()
92        {
93            if target_prunable_block > input.checkpoint().block_number {
94                input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
95
96                // Save prune checkpoint only if we don't have one already.
97                // Otherwise, pruner may skip the unpruned range of blocks.
98                if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
99                    let target_prunable_tx_number = provider
100                        .block_body_indices(target_prunable_block)?
101                        .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
102                        .last_tx_num();
103
104                    provider.save_prune_checkpoint(
105                        PruneSegment::TransactionLookup,
106                        PruneCheckpoint {
107                            block_number: Some(target_prunable_block),
108                            tx_number: Some(target_prunable_tx_number),
109                            prune_mode,
110                        },
111                    )?;
112                }
113            }
114        }
115        if input.target_reached() {
116            return Ok(ExecOutput::done(input.checkpoint()));
117        }
118
119        // 500MB temporary files
120        let mut hash_collector: Collector<TxHash, TxNumber> =
121            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
122
123        info!(
124            target: "sync::stages::transaction_lookup",
125            tx_range = ?input.checkpoint().block_number..=input.target(),
126            "Updating transaction lookup"
127        );
128
129        loop {
130            let (tx_range, block_range, is_final_range) =
131                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
132
133            let end_block = *block_range.end();
134
135            info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
136
137            for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
138                hash_collector.insert(key, value)?;
139            }
140
141            input.checkpoint = Some(
142                StageCheckpoint::new(end_block)
143                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
144            );
145
146            if is_final_range {
147                let append_only =
148                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
149                let mut txhash_cursor = provider
150                    .tx_ref()
151                    .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
152
153                let total_hashes = hash_collector.len();
154                let interval = (total_hashes / 10).max(1);
155                for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
156                    let (hash, number) = hash_to_number?;
157                    if index > 0 && index % interval == 0 {
158                        info!(
159                            target: "sync::stages::transaction_lookup",
160                            ?append_only,
161                            progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
162                            "Inserting hashes"
163                        );
164                    }
165
166                    let key = RawKey::<TxHash>::from_vec(hash);
167                    if append_only {
168                        txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
169                    } else {
170                        txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
171                    }
172                }
173
174                trace!(target: "sync::stages::transaction_lookup",
175                    total_hashes,
176                    "Transaction hashes inserted"
177                );
178
179                break;
180            }
181        }
182
183        Ok(ExecOutput {
184            checkpoint: StageCheckpoint::new(input.target())
185                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
186            done: true,
187        })
188    }
189
190    /// Unwind the stage.
191    fn unwind(
192        &mut self,
193        provider: &Provider,
194        input: UnwindInput,
195    ) -> Result<UnwindOutput, StageError> {
196        let tx = provider.tx_ref();
197        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
198
199        // Cursor to unwind tx hash to number
200        let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
201        let static_file_provider = provider.static_file_provider();
202        let rev_walker = provider
203            .block_body_indices_range(range.clone())?
204            .into_iter()
205            .zip(range.collect::<Vec<_>>())
206            .rev();
207
208        for (body, number) in rev_walker {
209            if number <= unwind_to {
210                break;
211            }
212
213            // Delete all transactions that belong to this block
214            for tx_id in body.tx_num_range() {
215                // First delete the transaction and hash to id mapping
216                if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
217                    if tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() {
218                        tx_hash_number_cursor.delete_current()?;
219                    }
220                }
221            }
222        }
223
224        Ok(UnwindOutput {
225            checkpoint: StageCheckpoint::new(unwind_to)
226                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
227        })
228    }
229}
230
231fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
232where
233    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
234{
235    let pruned_entries = provider
236        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
237        .and_then(|checkpoint| checkpoint.tx_number)
238        // `+1` is needed because `TxNumber` is 0-indexed
239        .map(|tx_number| tx_number + 1)
240        .unwrap_or_default();
241    Ok(EntitiesCheckpoint {
242        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
243        // matching the actual number of processed transactions. To fix that, we add the
244        // number of pruned `TransactionHashNumbers` entries.
245        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
246            pruned_entries,
247        // Count only static files entries. If we count the database entries too, we may have
248        // duplicates. We're sure that the static files have all entries that database has,
249        // because we run the `StaticFileProducer` before starting the pipeline.
250        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
251    })
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::test_utils::{
258        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
259        TestRunnerError, TestStageDB, UnwindStageTestRunner,
260    };
261    use alloy_primitives::{BlockNumber, B256};
262    use assert_matches::assert_matches;
263    use reth_db_api::transaction::DbTx;
264    use reth_ethereum_primitives::Block;
265    use reth_primitives_traits::SealedBlock;
266    use reth_provider::{
267        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
268        StaticFileProviderFactory,
269    };
270    use reth_stages_api::StageUnitCheckpoint;
271    use reth_testing_utils::generators::{
272        self, random_block, random_block_range, BlockParams, BlockRangeParams,
273    };
274    use std::ops::Sub;
275
276    // Implement stage test suite.
277    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
278
279    #[tokio::test]
280    async fn execute_single_transaction_lookup() {
281        let (previous_stage, stage_progress) = (500, 100);
282        let mut rng = generators::rng();
283
284        // Set up the runner
285        let runner = TransactionLookupTestRunner::default();
286        let input = ExecInput {
287            target: Some(previous_stage),
288            checkpoint: Some(StageCheckpoint::new(stage_progress)),
289        };
290
291        // Insert blocks with a single transaction at block `stage_progress + 10`
292        let non_empty_block_number = stage_progress + 10;
293        let blocks = (stage_progress..=input.target())
294            .map(|number| {
295                random_block(
296                    &mut rng,
297                    number,
298                    BlockParams {
299                        tx_count: Some((number == non_empty_block_number) as u8),
300                        ..Default::default()
301                    },
302                )
303            })
304            .collect::<Vec<_>>();
305        runner
306            .db
307            .insert_blocks(blocks.iter(), StorageKind::Static)
308            .expect("failed to insert blocks");
309
310        let rx = runner.execute(input);
311
312        // Assert the successful result
313        let result = rx.await.unwrap();
314        assert_matches!(
315            result,
316            Ok(ExecOutput {
317                checkpoint: StageCheckpoint {
318                block_number,
319                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
320                    processed,
321                    total
322                }))
323            }, done: true }) if block_number == previous_stage && processed == total &&
324                total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
325        );
326
327        // Validate the stage execution
328        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
329    }
330
331    #[tokio::test]
332    async fn execute_pruned_transaction_lookup() {
333        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
334        let mut rng = generators::rng();
335
336        // Set up the runner
337        let mut runner = TransactionLookupTestRunner::default();
338        let input = ExecInput {
339            target: Some(previous_stage),
340            checkpoint: Some(StageCheckpoint::new(stage_progress)),
341        };
342
343        // Seed only once with full input range
344        let seed = random_block_range(
345            &mut rng,
346            stage_progress + 1..=previous_stage,
347            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
348        );
349        runner
350            .db
351            .insert_blocks(seed.iter(), StorageKind::Static)
352            .expect("failed to seed execution");
353
354        runner.set_prune_mode(PruneMode::Before(prune_target));
355
356        let rx = runner.execute(input);
357
358        // Assert the successful result
359        let result = rx.await.unwrap();
360        assert_matches!(
361            result,
362            Ok(ExecOutput {
363                checkpoint: StageCheckpoint {
364                block_number,
365                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
366                    processed,
367                    total
368                }))
369            }, done: true }) if block_number == previous_stage && processed == total &&
370                total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
371        );
372
373        // Validate the stage execution
374        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
375    }
376
377    #[test]
378    fn stage_checkpoint_pruned() {
379        let db = TestStageDB::default();
380        let mut rng = generators::rng();
381
382        let blocks = random_block_range(
383            &mut rng,
384            0..=100,
385            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
386        );
387        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
388
389        let max_pruned_block = 30;
390        let max_processed_block = 70;
391
392        let mut tx_hash_numbers = Vec::new();
393        let mut tx_hash_number = 0;
394        for block in &blocks[..=max_processed_block] {
395            for transaction in &block.body().transactions {
396                if block.number > max_pruned_block {
397                    tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
398                }
399                tx_hash_number += 1;
400            }
401        }
402        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
403
404        let provider = db.factory.provider_rw().unwrap();
405        provider
406            .save_prune_checkpoint(
407                PruneSegment::TransactionLookup,
408                PruneCheckpoint {
409                    block_number: Some(max_pruned_block),
410                    tx_number: Some(
411                        blocks[..=max_pruned_block as usize]
412                            .iter()
413                            .map(|block| block.transaction_count() as u64)
414                            .sum::<u64>()
415                            .sub(1), // `TxNumber` is 0-indexed
416                    ),
417                    prune_mode: PruneMode::Full,
418                },
419            )
420            .expect("save stage checkpoint");
421        provider.commit().expect("commit");
422
423        let provider = db.factory.database_provider_rw().unwrap();
424        assert_eq!(
425            stage_checkpoint(&provider).expect("stage checkpoint"),
426            EntitiesCheckpoint {
427                processed: blocks[..=max_processed_block]
428                    .iter()
429                    .map(|block| block.transaction_count() as u64)
430                    .sum(),
431                total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
432            }
433        );
434    }
435
436    struct TransactionLookupTestRunner {
437        db: TestStageDB,
438        chunk_size: u64,
439        etl_config: EtlConfig,
440        prune_mode: Option<PruneMode>,
441    }
442
443    impl Default for TransactionLookupTestRunner {
444        fn default() -> Self {
445            Self {
446                db: TestStageDB::default(),
447                chunk_size: 1000,
448                etl_config: EtlConfig::default(),
449                prune_mode: None,
450            }
451        }
452    }
453
454    impl TransactionLookupTestRunner {
455        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
456            self.prune_mode = Some(prune_mode);
457        }
458
459        /// # Panics
460        ///
461        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
462        ///    given block number.
463        /// 2. If the is no requested block entry in the bodies table, but
464        ///    [`tables::TransactionHashNumbers`] is    not empty.
465        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
466            let body_result = self
467                .db
468                .factory
469                .provider_rw()?
470                .block_body_indices(number)?
471                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
472            match body_result {
473                Ok(body) => {
474                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
475                        body.last_tx_num(),
476                        |key| key,
477                    )?
478                }
479                Err(_) => {
480                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
481                }
482            };
483
484            Ok(())
485        }
486    }
487
488    impl StageTestRunner for TransactionLookupTestRunner {
489        type S = TransactionLookupStage;
490
491        fn db(&self) -> &TestStageDB {
492            &self.db
493        }
494
495        fn stage(&self) -> Self::S {
496            TransactionLookupStage {
497                chunk_size: self.chunk_size,
498                etl_config: self.etl_config.clone(),
499                prune_mode: self.prune_mode,
500            }
501        }
502    }
503
504    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
505        type Seed = Vec<SealedBlock<Block>>;
506
507        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
508            let stage_progress = input.checkpoint().block_number;
509            let end = input.target();
510            let mut rng = generators::rng();
511
512            let blocks = random_block_range(
513                &mut rng,
514                stage_progress + 1..=end,
515                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
516            );
517            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
518            Ok(blocks)
519        }
520
521        fn validate_execution(
522            &self,
523            mut input: ExecInput,
524            output: Option<ExecOutput>,
525        ) -> Result<(), TestRunnerError> {
526            match output {
527                Some(output) => {
528                    let provider = self.db.factory.provider()?;
529
530                    if let Some((target_prunable_block, _)) = self
531                        .prune_mode
532                        .map(|mode| {
533                            mode.prune_target_block(
534                                input.target(),
535                                PruneSegment::TransactionLookup,
536                                PrunePurpose::User,
537                            )
538                        })
539                        .transpose()
540                        .expect("prune target block for transaction lookup")
541                        .flatten()
542                    {
543                        if target_prunable_block > input.checkpoint().block_number {
544                            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
545                        }
546                    }
547                    let start_block = input.next_block();
548                    let end_block = output.checkpoint.block_number;
549
550                    if start_block > end_block {
551                        return Ok(())
552                    }
553
554                    let mut body_cursor =
555                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
556                    body_cursor.seek_exact(start_block)?;
557
558                    while let Some((_, body)) = body_cursor.next()? {
559                        for tx_id in body.tx_num_range() {
560                            let transaction =
561                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
562                            assert_eq!(
563                                Some(tx_id),
564                                provider.transaction_id(*transaction.tx_hash())?
565                            );
566                        }
567                    }
568                }
569                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
570            };
571            Ok(())
572        }
573    }
574
575    impl UnwindStageTestRunner for TransactionLookupTestRunner {
576        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
577            self.ensure_no_hash_by_block(input.unwind_to)
578        }
579    }
580}