reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW},
7    table::Value,
8    tables,
9    transaction::DbTxMut,
10    RawKey, RawValue,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15    BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
16    StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21    UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26/// The transaction lookup stage.
27///
28/// This stage walks over existing transactions, and sets the transaction hash of each transaction
29/// in a block to the corresponding `BlockNumber` at each block. This is written to the
30/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
31/// hash.
32///
33/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
34#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36    /// The maximum number of lookup entries to hold in memory before pushing them to
37    /// [`reth_etl::Collector`].
38    chunk_size: u64,
39    etl_config: EtlConfig,
40    prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44    fn default() -> Self {
45        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46    }
47}
48
49impl TransactionLookupStage {
50    /// Create new instance of [`TransactionLookupStage`].
51    pub const fn new(
52        config: TransactionLookupConfig,
53        etl_config: EtlConfig,
54        prune_mode: Option<PruneMode>,
55    ) -> Self {
56        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57    }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62    Provider: DBProvider<Tx: DbTxMut>
63        + PruneCheckpointWriter
64        + BlockReader
65        + PruneCheckpointReader
66        + StatsReader
67        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68        + TransactionsProviderExt,
69{
70    /// Return the id of the stage
71    fn id(&self) -> StageId {
72        StageId::TransactionLookup
73    }
74
75    /// Write transaction hash -> id entries
76    fn execute(
77        &mut self,
78        provider: &Provider,
79        mut input: ExecInput,
80    ) -> Result<ExecOutput, StageError> {
81        if let Some((target_prunable_block, prune_mode)) = self
82            .prune_mode
83            .map(|mode| {
84                mode.prune_target_block(
85                    input.target(),
86                    PruneSegment::TransactionLookup,
87                    PrunePurpose::User,
88                )
89            })
90            .transpose()?
91            .flatten() &&
92            target_prunable_block > input.checkpoint().block_number
93        {
94            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
95
96            // Save prune checkpoint only if we don't have one already.
97            // Otherwise, pruner may skip the unpruned range of blocks.
98            if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
99                let target_prunable_tx_number = provider
100                    .block_body_indices(target_prunable_block)?
101                    .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
102                    .last_tx_num();
103
104                provider.save_prune_checkpoint(
105                    PruneSegment::TransactionLookup,
106                    PruneCheckpoint {
107                        block_number: Some(target_prunable_block),
108                        tx_number: Some(target_prunable_tx_number),
109                        prune_mode,
110                    },
111                )?;
112            }
113        }
114        if input.target_reached() {
115            return Ok(ExecOutput::done(input.checkpoint()));
116        }
117
118        // 500MB temporary files
119        let mut hash_collector: Collector<TxHash, TxNumber> =
120            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
121
122        info!(
123            target: "sync::stages::transaction_lookup",
124            tx_range = ?input.checkpoint().block_number..=input.target(),
125            "Updating transaction lookup"
126        );
127
128        loop {
129            let Some(range_output) =
130                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
131            else {
132                input.checkpoint = Some(
133                    StageCheckpoint::new(input.target())
134                        .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
135                );
136                break;
137            };
138
139            let end_block = *range_output.block_range.end();
140
141            info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
142
143            for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
144                hash_collector.insert(key, value)?;
145            }
146
147            input.checkpoint = Some(
148                StageCheckpoint::new(end_block)
149                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
150            );
151
152            if range_output.is_final_range {
153                let append_only =
154                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
155                let mut txhash_cursor = provider
156                    .tx_ref()
157                    .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
158
159                let total_hashes = hash_collector.len();
160                let interval = (total_hashes / 10).max(1);
161                for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
162                    let (hash, number) = hash_to_number?;
163                    if index > 0 && index.is_multiple_of(interval) {
164                        info!(
165                            target: "sync::stages::transaction_lookup",
166                            ?append_only,
167                            progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
168                            "Inserting hashes"
169                        );
170                    }
171
172                    let key = RawKey::<TxHash>::from_vec(hash);
173                    if append_only {
174                        txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
175                    } else {
176                        txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
177                    }
178                }
179
180                trace!(target: "sync::stages::transaction_lookup",
181                    total_hashes,
182                    "Transaction hashes inserted"
183                );
184
185                break;
186            }
187        }
188
189        Ok(ExecOutput {
190            checkpoint: StageCheckpoint::new(input.target())
191                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
192            done: true,
193        })
194    }
195
196    /// Unwind the stage.
197    fn unwind(
198        &mut self,
199        provider: &Provider,
200        input: UnwindInput,
201    ) -> Result<UnwindOutput, StageError> {
202        let tx = provider.tx_ref();
203        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
204
205        // Cursor to unwind tx hash to number
206        let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
207        let static_file_provider = provider.static_file_provider();
208        let rev_walker = provider
209            .block_body_indices_range(range.clone())?
210            .into_iter()
211            .zip(range.collect::<Vec<_>>())
212            .rev();
213
214        for (body, number) in rev_walker {
215            if number <= unwind_to {
216                break;
217            }
218
219            // Delete all transactions that belong to this block
220            for tx_id in body.tx_num_range() {
221                // First delete the transaction and hash to id mapping
222                if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
223                    tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
224                {
225                    tx_hash_number_cursor.delete_current()?;
226                }
227            }
228        }
229
230        Ok(UnwindOutput {
231            checkpoint: StageCheckpoint::new(unwind_to)
232                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
233        })
234    }
235}
236
237fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
238where
239    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
240{
241    let pruned_entries = provider
242        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
243        .and_then(|checkpoint| checkpoint.tx_number)
244        // `+1` is needed because `TxNumber` is 0-indexed
245        .map(|tx_number| tx_number + 1)
246        .unwrap_or_default();
247    Ok(EntitiesCheckpoint {
248        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
249        // matching the actual number of processed transactions. To fix that, we add the
250        // number of pruned `TransactionHashNumbers` entries.
251        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
252            pruned_entries,
253        // Count only static files entries. If we count the database entries too, we may have
254        // duplicates. We're sure that the static files have all entries that database has,
255        // because we run the `StaticFileProducer` before starting the pipeline.
256        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
257    })
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::test_utils::{
264        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
265        TestRunnerError, TestStageDB, UnwindStageTestRunner,
266    };
267    use alloy_primitives::{BlockNumber, B256};
268    use assert_matches::assert_matches;
269    use reth_db_api::transaction::DbTx;
270    use reth_ethereum_primitives::Block;
271    use reth_primitives_traits::SealedBlock;
272    use reth_provider::{
273        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
274    };
275    use reth_stages_api::StageUnitCheckpoint;
276    use reth_testing_utils::generators::{
277        self, random_block, random_block_range, BlockParams, BlockRangeParams,
278    };
279    use std::ops::Sub;
280
281    // Implement stage test suite.
282    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
283
284    #[tokio::test]
285    async fn execute_single_transaction_lookup() {
286        let (previous_stage, stage_progress) = (500, 100);
287        let mut rng = generators::rng();
288
289        // Set up the runner
290        let runner = TransactionLookupTestRunner::default();
291        let input = ExecInput {
292            target: Some(previous_stage),
293            checkpoint: Some(StageCheckpoint::new(stage_progress)),
294        };
295
296        // Insert blocks with a single transaction at block `stage_progress + 10`
297        let non_empty_block_number = stage_progress + 10;
298        let blocks = (stage_progress..=input.target())
299            .map(|number| {
300                random_block(
301                    &mut rng,
302                    number,
303                    BlockParams {
304                        tx_count: Some((number == non_empty_block_number) as u8),
305                        ..Default::default()
306                    },
307                )
308            })
309            .collect::<Vec<_>>();
310        runner
311            .db
312            .insert_blocks(blocks.iter(), StorageKind::Static)
313            .expect("failed to insert blocks");
314
315        let rx = runner.execute(input);
316
317        // Assert the successful result
318        let result = rx.await.unwrap();
319        assert_matches!(
320            result,
321            Ok(ExecOutput {
322                checkpoint: StageCheckpoint {
323                block_number,
324                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
325                    processed,
326                    total
327                }))
328            }, done: true }) if block_number == previous_stage && processed == total &&
329                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
330        );
331
332        // Validate the stage execution
333        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
334    }
335
336    #[tokio::test]
337    async fn execute_pruned_transaction_lookup() {
338        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
339        let mut rng = generators::rng();
340
341        // Set up the runner
342        let mut runner = TransactionLookupTestRunner::default();
343        let input = ExecInput {
344            target: Some(previous_stage),
345            checkpoint: Some(StageCheckpoint::new(stage_progress)),
346        };
347
348        // Seed only once with full input range
349        let seed = random_block_range(
350            &mut rng,
351            stage_progress + 1..=previous_stage,
352            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
353        );
354        runner
355            .db
356            .insert_blocks(seed.iter(), StorageKind::Static)
357            .expect("failed to seed execution");
358
359        runner.set_prune_mode(PruneMode::Before(prune_target));
360
361        let rx = runner.execute(input);
362
363        // Assert the successful result
364        let result = rx.await.unwrap();
365        assert_matches!(
366            result,
367            Ok(ExecOutput {
368                checkpoint: StageCheckpoint {
369                block_number,
370                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
371                    processed,
372                    total
373                }))
374            }, done: true }) if block_number == previous_stage && processed == total &&
375                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
376        );
377
378        // Validate the stage execution
379        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
380    }
381
382    #[test]
383    fn stage_checkpoint_pruned() {
384        let db = TestStageDB::default();
385        let mut rng = generators::rng();
386
387        let blocks = random_block_range(
388            &mut rng,
389            0..=100,
390            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
391        );
392        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
393
394        let max_pruned_block = 30;
395        let max_processed_block = 70;
396
397        let mut tx_hash_numbers = Vec::new();
398        let mut tx_hash_number = 0;
399        for block in &blocks[..=max_processed_block] {
400            for transaction in &block.body().transactions {
401                if block.number > max_pruned_block {
402                    tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
403                }
404                tx_hash_number += 1;
405            }
406        }
407        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
408
409        let provider = db.factory.provider_rw().unwrap();
410        provider
411            .save_prune_checkpoint(
412                PruneSegment::TransactionLookup,
413                PruneCheckpoint {
414                    block_number: Some(max_pruned_block),
415                    tx_number: Some(
416                        blocks[..=max_pruned_block as usize]
417                            .iter()
418                            .map(|block| block.transaction_count() as u64)
419                            .sum::<u64>()
420                            .sub(1), // `TxNumber` is 0-indexed
421                    ),
422                    prune_mode: PruneMode::Full,
423                },
424            )
425            .expect("save stage checkpoint");
426        provider.commit().expect("commit");
427
428        let provider = db.factory.database_provider_rw().unwrap();
429        assert_eq!(
430            stage_checkpoint(&provider).expect("stage checkpoint"),
431            EntitiesCheckpoint {
432                processed: blocks[..=max_processed_block]
433                    .iter()
434                    .map(|block| block.transaction_count() as u64)
435                    .sum(),
436                total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
437            }
438        );
439    }
440
441    struct TransactionLookupTestRunner {
442        db: TestStageDB,
443        chunk_size: u64,
444        etl_config: EtlConfig,
445        prune_mode: Option<PruneMode>,
446    }
447
448    impl Default for TransactionLookupTestRunner {
449        fn default() -> Self {
450            Self {
451                db: TestStageDB::default(),
452                chunk_size: 1000,
453                etl_config: EtlConfig::default(),
454                prune_mode: None,
455            }
456        }
457    }
458
459    impl TransactionLookupTestRunner {
460        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
461            self.prune_mode = Some(prune_mode);
462        }
463
464        /// # Panics
465        ///
466        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
467        ///    given block number.
468        /// 2. If there is no requested block entry in the bodies table, but
469        ///    [`tables::TransactionHashNumbers`] is    not empty.
470        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
471            let body_result = self
472                .db
473                .factory
474                .provider_rw()?
475                .block_body_indices(number)?
476                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
477            match body_result {
478                Ok(body) => {
479                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
480                        body.last_tx_num(),
481                        |key| key,
482                    )?
483                }
484                Err(_) => {
485                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
486                }
487            };
488
489            Ok(())
490        }
491    }
492
493    impl StageTestRunner for TransactionLookupTestRunner {
494        type S = TransactionLookupStage;
495
496        fn db(&self) -> &TestStageDB {
497            &self.db
498        }
499
500        fn stage(&self) -> Self::S {
501            TransactionLookupStage {
502                chunk_size: self.chunk_size,
503                etl_config: self.etl_config.clone(),
504                prune_mode: self.prune_mode,
505            }
506        }
507    }
508
509    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
510        type Seed = Vec<SealedBlock<Block>>;
511
512        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
513            let stage_progress = input.checkpoint().block_number;
514            let end = input.target();
515            let mut rng = generators::rng();
516
517            let blocks = random_block_range(
518                &mut rng,
519                stage_progress + 1..=end,
520                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
521            );
522            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
523            Ok(blocks)
524        }
525
526        fn validate_execution(
527            &self,
528            mut input: ExecInput,
529            output: Option<ExecOutput>,
530        ) -> Result<(), TestRunnerError> {
531            match output {
532                Some(output) => {
533                    let provider = self.db.factory.provider()?;
534
535                    if let Some((target_prunable_block, _)) = self
536                        .prune_mode
537                        .map(|mode| {
538                            mode.prune_target_block(
539                                input.target(),
540                                PruneSegment::TransactionLookup,
541                                PrunePurpose::User,
542                            )
543                        })
544                        .transpose()
545                        .expect("prune target block for transaction lookup")
546                        .flatten() &&
547                        target_prunable_block > input.checkpoint().block_number
548                    {
549                        input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
550                    }
551                    let start_block = input.next_block();
552                    let end_block = output.checkpoint.block_number;
553
554                    if start_block > end_block {
555                        return Ok(())
556                    }
557
558                    let mut body_cursor =
559                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
560                    body_cursor.seek_exact(start_block)?;
561
562                    while let Some((_, body)) = body_cursor.next()? {
563                        for tx_id in body.tx_num_range() {
564                            let transaction =
565                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
566                            assert_eq!(
567                                Some(tx_id),
568                                provider.transaction_id(*transaction.tx_hash())?
569                            );
570                        }
571                    }
572                }
573                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
574            };
575            Ok(())
576        }
577    }
578
579    impl UnwindStageTestRunner for TransactionLookupTestRunner {
580        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
581            self.ensure_no_hash_by_block(input.unwind_to)
582        }
583    }
584}