reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW},
7    table::Value,
8    tables,
9    transaction::DbTxMut,
10    RawKey, RawValue,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15    BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
16    StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21    UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26/// The transaction lookup stage.
27///
28/// This stage walks over existing transactions, and sets the transaction hash of each transaction
29/// in a block to the corresponding `BlockNumber` at each block. This is written to the
30/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
31/// hash.
32///
33/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
34#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36    /// The maximum number of lookup entries to hold in memory before pushing them to
37    /// [`reth_etl::Collector`].
38    chunk_size: u64,
39    etl_config: EtlConfig,
40    prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44    fn default() -> Self {
45        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46    }
47}
48
49impl TransactionLookupStage {
50    /// Create new instance of [`TransactionLookupStage`].
51    pub const fn new(
52        config: TransactionLookupConfig,
53        etl_config: EtlConfig,
54        prune_mode: Option<PruneMode>,
55    ) -> Self {
56        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57    }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62    Provider: DBProvider<Tx: DbTxMut>
63        + PruneCheckpointWriter
64        + BlockReader
65        + PruneCheckpointReader
66        + StatsReader
67        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68        + TransactionsProviderExt,
69{
70    /// Return the id of the stage
71    fn id(&self) -> StageId {
72        StageId::TransactionLookup
73    }
74
75    /// Write transaction hash -> id entries
76    fn execute(
77        &mut self,
78        provider: &Provider,
79        mut input: ExecInput,
80    ) -> Result<ExecOutput, StageError> {
81        if let Some((target_prunable_block, prune_mode)) = self
82            .prune_mode
83            .map(|mode| {
84                mode.prune_target_block(
85                    input.target(),
86                    PruneSegment::TransactionLookup,
87                    PrunePurpose::User,
88                )
89            })
90            .transpose()?
91            .flatten() &&
92            target_prunable_block > input.checkpoint().block_number
93        {
94            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
95
96            // Save prune checkpoint only if we don't have one already.
97            // Otherwise, pruner may skip the unpruned range of blocks.
98            if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
99                let target_prunable_tx_number = provider
100                    .block_body_indices(target_prunable_block)?
101                    .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
102                    .last_tx_num();
103
104                provider.save_prune_checkpoint(
105                    PruneSegment::TransactionLookup,
106                    PruneCheckpoint {
107                        block_number: Some(target_prunable_block),
108                        tx_number: Some(target_prunable_tx_number),
109                        prune_mode,
110                    },
111                )?;
112            }
113        }
114        if input.target_reached() {
115            return Ok(ExecOutput::done(input.checkpoint()));
116        }
117
118        // 500MB temporary files
119        let mut hash_collector: Collector<TxHash, TxNumber> =
120            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
121
122        info!(
123            target: "sync::stages::transaction_lookup",
124            tx_range = ?input.checkpoint().block_number..=input.target(),
125            "Updating transaction lookup"
126        );
127
128        loop {
129            let (tx_range, block_range, is_final_range) =
130                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
131
132            let end_block = *block_range.end();
133
134            info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
135
136            for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
137                hash_collector.insert(key, value)?;
138            }
139
140            input.checkpoint = Some(
141                StageCheckpoint::new(end_block)
142                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
143            );
144
145            if is_final_range {
146                let append_only =
147                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
148                let mut txhash_cursor = provider
149                    .tx_ref()
150                    .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
151
152                let total_hashes = hash_collector.len();
153                let interval = (total_hashes / 10).max(1);
154                for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
155                    let (hash, number) = hash_to_number?;
156                    if index > 0 && index.is_multiple_of(interval) {
157                        info!(
158                            target: "sync::stages::transaction_lookup",
159                            ?append_only,
160                            progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
161                            "Inserting hashes"
162                        );
163                    }
164
165                    let key = RawKey::<TxHash>::from_vec(hash);
166                    if append_only {
167                        txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
168                    } else {
169                        txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
170                    }
171                }
172
173                trace!(target: "sync::stages::transaction_lookup",
174                    total_hashes,
175                    "Transaction hashes inserted"
176                );
177
178                break;
179            }
180        }
181
182        Ok(ExecOutput {
183            checkpoint: StageCheckpoint::new(input.target())
184                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
185            done: true,
186        })
187    }
188
189    /// Unwind the stage.
190    fn unwind(
191        &mut self,
192        provider: &Provider,
193        input: UnwindInput,
194    ) -> Result<UnwindOutput, StageError> {
195        let tx = provider.tx_ref();
196        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
197
198        // Cursor to unwind tx hash to number
199        let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
200        let static_file_provider = provider.static_file_provider();
201        let rev_walker = provider
202            .block_body_indices_range(range.clone())?
203            .into_iter()
204            .zip(range.collect::<Vec<_>>())
205            .rev();
206
207        for (body, number) in rev_walker {
208            if number <= unwind_to {
209                break;
210            }
211
212            // Delete all transactions that belong to this block
213            for tx_id in body.tx_num_range() {
214                // First delete the transaction and hash to id mapping
215                if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
216                    tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
217                {
218                    tx_hash_number_cursor.delete_current()?;
219                }
220            }
221        }
222
223        Ok(UnwindOutput {
224            checkpoint: StageCheckpoint::new(unwind_to)
225                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
226        })
227    }
228}
229
230fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
231where
232    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
233{
234    let pruned_entries = provider
235        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
236        .and_then(|checkpoint| checkpoint.tx_number)
237        // `+1` is needed because `TxNumber` is 0-indexed
238        .map(|tx_number| tx_number + 1)
239        .unwrap_or_default();
240    Ok(EntitiesCheckpoint {
241        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
242        // matching the actual number of processed transactions. To fix that, we add the
243        // number of pruned `TransactionHashNumbers` entries.
244        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
245            pruned_entries,
246        // Count only static files entries. If we count the database entries too, we may have
247        // duplicates. We're sure that the static files have all entries that database has,
248        // because we run the `StaticFileProducer` before starting the pipeline.
249        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
250    })
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use crate::test_utils::{
257        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
258        TestRunnerError, TestStageDB, UnwindStageTestRunner,
259    };
260    use alloy_primitives::{BlockNumber, B256};
261    use assert_matches::assert_matches;
262    use reth_db_api::transaction::DbTx;
263    use reth_ethereum_primitives::Block;
264    use reth_primitives_traits::SealedBlock;
265    use reth_provider::{
266        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
267    };
268    use reth_stages_api::StageUnitCheckpoint;
269    use reth_testing_utils::generators::{
270        self, random_block, random_block_range, BlockParams, BlockRangeParams,
271    };
272    use std::ops::Sub;
273
274    // Implement stage test suite.
275    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
276
277    #[tokio::test]
278    async fn execute_single_transaction_lookup() {
279        let (previous_stage, stage_progress) = (500, 100);
280        let mut rng = generators::rng();
281
282        // Set up the runner
283        let runner = TransactionLookupTestRunner::default();
284        let input = ExecInput {
285            target: Some(previous_stage),
286            checkpoint: Some(StageCheckpoint::new(stage_progress)),
287        };
288
289        // Insert blocks with a single transaction at block `stage_progress + 10`
290        let non_empty_block_number = stage_progress + 10;
291        let blocks = (stage_progress..=input.target())
292            .map(|number| {
293                random_block(
294                    &mut rng,
295                    number,
296                    BlockParams {
297                        tx_count: Some((number == non_empty_block_number) as u8),
298                        ..Default::default()
299                    },
300                )
301            })
302            .collect::<Vec<_>>();
303        runner
304            .db
305            .insert_blocks(blocks.iter(), StorageKind::Static)
306            .expect("failed to insert blocks");
307
308        let rx = runner.execute(input);
309
310        // Assert the successful result
311        let result = rx.await.unwrap();
312        assert_matches!(
313            result,
314            Ok(ExecOutput {
315                checkpoint: StageCheckpoint {
316                block_number,
317                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
318                    processed,
319                    total
320                }))
321            }, done: true }) if block_number == previous_stage && processed == total &&
322                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
323        );
324
325        // Validate the stage execution
326        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
327    }
328
329    #[tokio::test]
330    async fn execute_pruned_transaction_lookup() {
331        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
332        let mut rng = generators::rng();
333
334        // Set up the runner
335        let mut runner = TransactionLookupTestRunner::default();
336        let input = ExecInput {
337            target: Some(previous_stage),
338            checkpoint: Some(StageCheckpoint::new(stage_progress)),
339        };
340
341        // Seed only once with full input range
342        let seed = random_block_range(
343            &mut rng,
344            stage_progress + 1..=previous_stage,
345            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
346        );
347        runner
348            .db
349            .insert_blocks(seed.iter(), StorageKind::Static)
350            .expect("failed to seed execution");
351
352        runner.set_prune_mode(PruneMode::Before(prune_target));
353
354        let rx = runner.execute(input);
355
356        // Assert the successful result
357        let result = rx.await.unwrap();
358        assert_matches!(
359            result,
360            Ok(ExecOutput {
361                checkpoint: StageCheckpoint {
362                block_number,
363                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
364                    processed,
365                    total
366                }))
367            }, done: true }) if block_number == previous_stage && processed == total &&
368                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
369        );
370
371        // Validate the stage execution
372        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
373    }
374
375    #[test]
376    fn stage_checkpoint_pruned() {
377        let db = TestStageDB::default();
378        let mut rng = generators::rng();
379
380        let blocks = random_block_range(
381            &mut rng,
382            0..=100,
383            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
384        );
385        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
386
387        let max_pruned_block = 30;
388        let max_processed_block = 70;
389
390        let mut tx_hash_numbers = Vec::new();
391        let mut tx_hash_number = 0;
392        for block in &blocks[..=max_processed_block] {
393            for transaction in &block.body().transactions {
394                if block.number > max_pruned_block {
395                    tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
396                }
397                tx_hash_number += 1;
398            }
399        }
400        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
401
402        let provider = db.factory.provider_rw().unwrap();
403        provider
404            .save_prune_checkpoint(
405                PruneSegment::TransactionLookup,
406                PruneCheckpoint {
407                    block_number: Some(max_pruned_block),
408                    tx_number: Some(
409                        blocks[..=max_pruned_block as usize]
410                            .iter()
411                            .map(|block| block.transaction_count() as u64)
412                            .sum::<u64>()
413                            .sub(1), // `TxNumber` is 0-indexed
414                    ),
415                    prune_mode: PruneMode::Full,
416                },
417            )
418            .expect("save stage checkpoint");
419        provider.commit().expect("commit");
420
421        let provider = db.factory.database_provider_rw().unwrap();
422        assert_eq!(
423            stage_checkpoint(&provider).expect("stage checkpoint"),
424            EntitiesCheckpoint {
425                processed: blocks[..=max_processed_block]
426                    .iter()
427                    .map(|block| block.transaction_count() as u64)
428                    .sum(),
429                total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
430            }
431        );
432    }
433
434    struct TransactionLookupTestRunner {
435        db: TestStageDB,
436        chunk_size: u64,
437        etl_config: EtlConfig,
438        prune_mode: Option<PruneMode>,
439    }
440
441    impl Default for TransactionLookupTestRunner {
442        fn default() -> Self {
443            Self {
444                db: TestStageDB::default(),
445                chunk_size: 1000,
446                etl_config: EtlConfig::default(),
447                prune_mode: None,
448            }
449        }
450    }
451
452    impl TransactionLookupTestRunner {
453        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
454            self.prune_mode = Some(prune_mode);
455        }
456
457        /// # Panics
458        ///
459        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
460        ///    given block number.
461        /// 2. If there is no requested block entry in the bodies table, but
462        ///    [`tables::TransactionHashNumbers`] is    not empty.
463        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
464            let body_result = self
465                .db
466                .factory
467                .provider_rw()?
468                .block_body_indices(number)?
469                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
470            match body_result {
471                Ok(body) => {
472                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
473                        body.last_tx_num(),
474                        |key| key,
475                    )?
476                }
477                Err(_) => {
478                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
479                }
480            };
481
482            Ok(())
483        }
484    }
485
486    impl StageTestRunner for TransactionLookupTestRunner {
487        type S = TransactionLookupStage;
488
489        fn db(&self) -> &TestStageDB {
490            &self.db
491        }
492
493        fn stage(&self) -> Self::S {
494            TransactionLookupStage {
495                chunk_size: self.chunk_size,
496                etl_config: self.etl_config.clone(),
497                prune_mode: self.prune_mode,
498            }
499        }
500    }
501
502    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
503        type Seed = Vec<SealedBlock<Block>>;
504
505        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
506            let stage_progress = input.checkpoint().block_number;
507            let end = input.target();
508            let mut rng = generators::rng();
509
510            let blocks = random_block_range(
511                &mut rng,
512                stage_progress + 1..=end,
513                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
514            );
515            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
516            Ok(blocks)
517        }
518
519        fn validate_execution(
520            &self,
521            mut input: ExecInput,
522            output: Option<ExecOutput>,
523        ) -> Result<(), TestRunnerError> {
524            match output {
525                Some(output) => {
526                    let provider = self.db.factory.provider()?;
527
528                    if let Some((target_prunable_block, _)) = self
529                        .prune_mode
530                        .map(|mode| {
531                            mode.prune_target_block(
532                                input.target(),
533                                PruneSegment::TransactionLookup,
534                                PrunePurpose::User,
535                            )
536                        })
537                        .transpose()
538                        .expect("prune target block for transaction lookup")
539                        .flatten() &&
540                        target_prunable_block > input.checkpoint().block_number
541                    {
542                        input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
543                    }
544                    let start_block = input.next_block();
545                    let end_block = output.checkpoint.block_number;
546
547                    if start_block > end_block {
548                        return Ok(())
549                    }
550
551                    let mut body_cursor =
552                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
553                    body_cursor.seek_exact(start_block)?;
554
555                    while let Some((_, body)) = body_cursor.next()? {
556                        for tx_id in body.tx_num_range() {
557                            let transaction =
558                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
559                            assert_eq!(
560                                Some(tx_id),
561                                provider.transaction_id(*transaction.tx_hash())?
562                            );
563                        }
564                    }
565                }
566                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
567            };
568            Ok(())
569        }
570    }
571
572    impl UnwindStageTestRunner for TransactionLookupTestRunner {
573        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
574            self.ensure_no_hash_by_block(input.unwind_to)
575        }
576    }
577}