reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6    table::{Decode, Decompress, Value},
7    tables,
8    transaction::DbTxMut,
9};
10use reth_etl::Collector;
11use reth_primitives_traits::{NodePrimitives, SignedTransaction};
12use reth_provider::{
13    BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
14    RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
15    TransactionsProvider, TransactionsProviderExt,
16};
17use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
18use reth_stages_api::{
19    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
20    UnwindInput, UnwindOutput,
21};
22use reth_storage_errors::provider::ProviderError;
23use tracing::*;
24
25/// The transaction lookup stage.
26///
27/// This stage walks over existing transactions, and sets the transaction hash of each transaction
28/// in a block to the corresponding `BlockNumber` at each block. This is written to the
29/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
30/// hash.
31///
32/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
33#[derive(Debug, Clone)]
34pub struct TransactionLookupStage {
35    /// The maximum number of lookup entries to hold in memory before pushing them to
36    /// [`reth_etl::Collector`].
37    chunk_size: u64,
38    etl_config: EtlConfig,
39    prune_mode: Option<PruneMode>,
40}
41
42impl Default for TransactionLookupStage {
43    fn default() -> Self {
44        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
45    }
46}
47
48impl TransactionLookupStage {
49    /// Create new instance of [`TransactionLookupStage`].
50    pub const fn new(
51        config: TransactionLookupConfig,
52        etl_config: EtlConfig,
53        prune_mode: Option<PruneMode>,
54    ) -> Self {
55        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
56    }
57}
58
59impl<Provider> Stage<Provider> for TransactionLookupStage
60where
61    Provider: DBProvider<Tx: DbTxMut>
62        + PruneCheckpointWriter
63        + BlockReader
64        + PruneCheckpointReader
65        + StatsReader
66        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
67        + TransactionsProviderExt
68        + StorageSettingsCache
69        + RocksDBProviderFactory,
70{
71    /// Return the id of the stage
72    fn id(&self) -> StageId {
73        StageId::TransactionLookup
74    }
75
76    /// Write transaction hash -> id entries
77    fn execute(
78        &mut self,
79        provider: &Provider,
80        mut input: ExecInput,
81    ) -> Result<ExecOutput, StageError> {
82        if let Some((target_prunable_block, prune_mode)) = self
83            .prune_mode
84            .map(|mode| {
85                mode.prune_target_block(
86                    input.target(),
87                    PruneSegment::TransactionLookup,
88                    PrunePurpose::User,
89                )
90            })
91            .transpose()?
92            .flatten() &&
93            target_prunable_block > input.checkpoint().block_number
94        {
95            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
96
97            // Save prune checkpoint only if we don't have one already.
98            // Otherwise, pruner may skip the unpruned range of blocks.
99            if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
100                let target_prunable_tx_number = provider
101                    .block_body_indices(target_prunable_block)?
102                    .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
103                    .last_tx_num();
104
105                provider.save_prune_checkpoint(
106                    PruneSegment::TransactionLookup,
107                    PruneCheckpoint {
108                        block_number: Some(target_prunable_block),
109                        tx_number: Some(target_prunable_tx_number),
110                        prune_mode,
111                    },
112                )?;
113            }
114        }
115        if input.target_reached() {
116            return Ok(ExecOutput::done(input.checkpoint()));
117        }
118
119        // 500MB temporary files
120        let mut hash_collector: Collector<TxHash, TxNumber> =
121            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
122
123        info!(
124            target: "sync::stages::transaction_lookup",
125            tx_range = ?input.checkpoint().block_number..=input.target(),
126            "Updating transaction lookup"
127        );
128
129        loop {
130            let Some(range_output) =
131                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
132            else {
133                input.checkpoint = Some(
134                    StageCheckpoint::new(input.target())
135                        .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
136                );
137                break;
138            };
139
140            let end_block = *range_output.block_range.end();
141
142            info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
143
144            for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
145                hash_collector.insert(key, value)?;
146            }
147
148            input.checkpoint = Some(
149                StageCheckpoint::new(end_block)
150                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
151            );
152
153            if range_output.is_final_range {
154                let total_hashes = hash_collector.len();
155                let interval = (total_hashes / 10).max(1);
156
157                // Use append mode when table is empty (first sync) - significantly faster
158                let append_only =
159                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
160
161                // Create RocksDB batch if feature is enabled
162                #[cfg(all(unix, feature = "rocksdb"))]
163                let rocksdb = provider.rocksdb_provider();
164                #[cfg(all(unix, feature = "rocksdb"))]
165                let rocksdb_batch = rocksdb.batch();
166                #[cfg(not(all(unix, feature = "rocksdb")))]
167                let rocksdb_batch = ();
168
169                // Create writer that routes to either MDBX or RocksDB based on settings
170                let mut writer =
171                    EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
172
173                for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
174                    let (hash_bytes, number_bytes) = hash_to_number?;
175                    if index > 0 && index.is_multiple_of(interval) {
176                        info!(
177                            target: "sync::stages::transaction_lookup",
178                            ?append_only,
179                            progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
180                            "Inserting hashes"
181                        );
182                    }
183
184                    // Decode from raw ETL bytes
185                    let hash = TxHash::decode(&hash_bytes)?;
186                    let tx_num = TxNumber::decompress(&number_bytes)?;
187                    writer.put_transaction_hash_number(hash, tx_num, append_only)?;
188                }
189
190                // Extract and register RocksDB batch for commit at provider level
191                #[cfg(all(unix, feature = "rocksdb"))]
192                if let Some(batch) = writer.into_raw_rocksdb_batch() {
193                    provider.set_pending_rocksdb_batch(batch);
194                }
195
196                trace!(target: "sync::stages::transaction_lookup",
197                    total_hashes,
198                    "Transaction hashes inserted"
199                );
200
201                break;
202            }
203        }
204
205        Ok(ExecOutput {
206            checkpoint: StageCheckpoint::new(input.target())
207                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
208            done: true,
209        })
210    }
211
212    /// Unwind the stage.
213    fn unwind(
214        &mut self,
215        provider: &Provider,
216        input: UnwindInput,
217    ) -> Result<UnwindOutput, StageError> {
218        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
219
220        // Create RocksDB batch if feature is enabled
221        #[cfg(all(unix, feature = "rocksdb"))]
222        let rocksdb = provider.rocksdb_provider();
223        #[cfg(all(unix, feature = "rocksdb"))]
224        let rocksdb_batch = rocksdb.batch();
225        #[cfg(not(all(unix, feature = "rocksdb")))]
226        let rocksdb_batch = ();
227
228        // Create writer that routes to either MDBX or RocksDB based on settings
229        let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
230
231        let static_file_provider = provider.static_file_provider();
232        let rev_walker = provider
233            .block_body_indices_range(range.clone())?
234            .into_iter()
235            .zip(range.collect::<Vec<_>>())
236            .rev();
237
238        for (body, number) in rev_walker {
239            if number <= unwind_to {
240                break;
241            }
242
243            // Delete all transactions that belong to this block
244            for tx_id in body.tx_num_range() {
245                if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
246                    writer.delete_transaction_hash_number(transaction.trie_hash())?;
247                }
248            }
249        }
250
251        // Extract and register RocksDB batch for commit at provider level
252        #[cfg(all(unix, feature = "rocksdb"))]
253        if let Some(batch) = writer.into_raw_rocksdb_batch() {
254            provider.set_pending_rocksdb_batch(batch);
255        }
256
257        Ok(UnwindOutput {
258            checkpoint: StageCheckpoint::new(unwind_to)
259                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
260        })
261    }
262}
263
264fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
265where
266    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
267{
268    let pruned_entries = provider
269        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
270        .and_then(|checkpoint| checkpoint.tx_number)
271        // `+1` is needed because `TxNumber` is 0-indexed
272        .map(|tx_number| tx_number + 1)
273        .unwrap_or_default();
274    Ok(EntitiesCheckpoint {
275        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
276        // matching the actual number of processed transactions. To fix that, we add the
277        // number of pruned `TransactionHashNumbers` entries.
278        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
279            pruned_entries,
280        // Count only static files entries. If we count the database entries too, we may have
281        // duplicates. We're sure that the static files have all entries that database has,
282        // because we run the `StaticFileProducer` before starting the pipeline.
283        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
284    })
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::test_utils::{
291        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
292        TestRunnerError, TestStageDB, UnwindStageTestRunner,
293    };
294    use alloy_primitives::{BlockNumber, B256};
295    use assert_matches::assert_matches;
296    use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
297    use reth_ethereum_primitives::Block;
298    use reth_primitives_traits::SealedBlock;
299    use reth_provider::{
300        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
301    };
302    use reth_stages_api::StageUnitCheckpoint;
303    use reth_testing_utils::generators::{
304        self, random_block, random_block_range, BlockParams, BlockRangeParams,
305    };
306    use std::ops::Sub;
307
308    // Implement stage test suite.
309    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
310
311    #[tokio::test]
312    async fn execute_single_transaction_lookup() {
313        let (previous_stage, stage_progress) = (500, 100);
314        let mut rng = generators::rng();
315
316        // Set up the runner
317        let runner = TransactionLookupTestRunner::default();
318        let input = ExecInput {
319            target: Some(previous_stage),
320            checkpoint: Some(StageCheckpoint::new(stage_progress)),
321        };
322
323        // Insert blocks with a single transaction at block `stage_progress + 10`
324        let non_empty_block_number = stage_progress + 10;
325        let blocks = (stage_progress..=input.target())
326            .map(|number| {
327                random_block(
328                    &mut rng,
329                    number,
330                    BlockParams {
331                        tx_count: Some((number == non_empty_block_number) as u8),
332                        ..Default::default()
333                    },
334                )
335            })
336            .collect::<Vec<_>>();
337        runner
338            .db
339            .insert_blocks(blocks.iter(), StorageKind::Static)
340            .expect("failed to insert blocks");
341
342        let rx = runner.execute(input);
343
344        // Assert the successful result
345        let result = rx.await.unwrap();
346        assert_matches!(
347            result,
348            Ok(ExecOutput {
349                checkpoint: StageCheckpoint {
350                block_number,
351                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
352                    processed,
353                    total
354                }))
355            }, done: true }) if block_number == previous_stage && processed == total &&
356                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
357        );
358
359        // Validate the stage execution
360        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
361    }
362
363    #[tokio::test]
364    async fn execute_pruned_transaction_lookup() {
365        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
366        let mut rng = generators::rng();
367
368        // Set up the runner
369        let mut runner = TransactionLookupTestRunner::default();
370        let input = ExecInput {
371            target: Some(previous_stage),
372            checkpoint: Some(StageCheckpoint::new(stage_progress)),
373        };
374
375        // Seed only once with full input range
376        let seed = random_block_range(
377            &mut rng,
378            stage_progress + 1..=previous_stage,
379            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
380        );
381        runner
382            .db
383            .insert_blocks(seed.iter(), StorageKind::Static)
384            .expect("failed to seed execution");
385
386        runner.set_prune_mode(PruneMode::Before(prune_target));
387
388        let rx = runner.execute(input);
389
390        // Assert the successful result
391        let result = rx.await.unwrap();
392        assert_matches!(
393            result,
394            Ok(ExecOutput {
395                checkpoint: StageCheckpoint {
396                block_number,
397                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
398                    processed,
399                    total
400                }))
401            }, done: true }) if block_number == previous_stage && processed == total &&
402                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
403        );
404
405        // Validate the stage execution
406        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
407    }
408
409    #[test]
410    fn stage_checkpoint_pruned() {
411        let db = TestStageDB::default();
412        let mut rng = generators::rng();
413
414        let blocks = random_block_range(
415            &mut rng,
416            0..=100,
417            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
418        );
419        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
420
421        let max_pruned_block = 30;
422        let max_processed_block = 70;
423
424        let mut tx_hash_numbers = Vec::new();
425        let mut tx_hash_number = 0;
426        for block in &blocks[..=max_processed_block] {
427            for transaction in &block.body().transactions {
428                if block.number > max_pruned_block {
429                    tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
430                }
431                tx_hash_number += 1;
432            }
433        }
434        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
435
436        let provider = db.factory.provider_rw().unwrap();
437        provider
438            .save_prune_checkpoint(
439                PruneSegment::TransactionLookup,
440                PruneCheckpoint {
441                    block_number: Some(max_pruned_block),
442                    tx_number: Some(
443                        blocks[..=max_pruned_block as usize]
444                            .iter()
445                            .map(|block| block.transaction_count() as u64)
446                            .sum::<u64>()
447                            .sub(1), // `TxNumber` is 0-indexed
448                    ),
449                    prune_mode: PruneMode::Full,
450                },
451            )
452            .expect("save stage checkpoint");
453        provider.commit().expect("commit");
454
455        let provider = db.factory.database_provider_rw().unwrap();
456        assert_eq!(
457            stage_checkpoint(&provider).expect("stage checkpoint"),
458            EntitiesCheckpoint {
459                processed: blocks[..=max_processed_block]
460                    .iter()
461                    .map(|block| block.transaction_count() as u64)
462                    .sum(),
463                total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
464            }
465        );
466    }
467
468    struct TransactionLookupTestRunner {
469        db: TestStageDB,
470        chunk_size: u64,
471        etl_config: EtlConfig,
472        prune_mode: Option<PruneMode>,
473    }
474
475    impl Default for TransactionLookupTestRunner {
476        fn default() -> Self {
477            Self {
478                db: TestStageDB::default(),
479                chunk_size: 1000,
480                etl_config: EtlConfig::default(),
481                prune_mode: None,
482            }
483        }
484    }
485
486    impl TransactionLookupTestRunner {
487        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
488            self.prune_mode = Some(prune_mode);
489        }
490
491        /// # Panics
492        ///
493        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
494        ///    given block number.
495        /// 2. If there is no requested block entry in the bodies table, but
496        ///    [`tables::TransactionHashNumbers`] is    not empty.
497        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
498            let body_result = self
499                .db
500                .factory
501                .provider_rw()?
502                .block_body_indices(number)?
503                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
504            match body_result {
505                Ok(body) => {
506                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
507                        body.last_tx_num(),
508                        |key| key,
509                    )?
510                }
511                Err(_) => {
512                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
513                }
514            };
515
516            Ok(())
517        }
518    }
519
520    impl StageTestRunner for TransactionLookupTestRunner {
521        type S = TransactionLookupStage;
522
523        fn db(&self) -> &TestStageDB {
524            &self.db
525        }
526
527        fn stage(&self) -> Self::S {
528            TransactionLookupStage {
529                chunk_size: self.chunk_size,
530                etl_config: self.etl_config.clone(),
531                prune_mode: self.prune_mode,
532            }
533        }
534    }
535
536    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
537        type Seed = Vec<SealedBlock<Block>>;
538
539        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
540            let stage_progress = input.checkpoint().block_number;
541            let end = input.target();
542            let mut rng = generators::rng();
543
544            let blocks = random_block_range(
545                &mut rng,
546                stage_progress + 1..=end,
547                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
548            );
549            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
550            Ok(blocks)
551        }
552
553        fn validate_execution(
554            &self,
555            mut input: ExecInput,
556            output: Option<ExecOutput>,
557        ) -> Result<(), TestRunnerError> {
558            match output {
559                Some(output) => {
560                    let provider = self.db.factory.provider()?;
561
562                    if let Some((target_prunable_block, _)) = self
563                        .prune_mode
564                        .map(|mode| {
565                            mode.prune_target_block(
566                                input.target(),
567                                PruneSegment::TransactionLookup,
568                                PrunePurpose::User,
569                            )
570                        })
571                        .transpose()
572                        .expect("prune target block for transaction lookup")
573                        .flatten() &&
574                        target_prunable_block > input.checkpoint().block_number
575                    {
576                        input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
577                    }
578                    let start_block = input.next_block();
579                    let end_block = output.checkpoint.block_number;
580
581                    if start_block > end_block {
582                        return Ok(())
583                    }
584
585                    let mut body_cursor =
586                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
587                    body_cursor.seek_exact(start_block)?;
588
589                    while let Some((_, body)) = body_cursor.next()? {
590                        for tx_id in body.tx_num_range() {
591                            let transaction =
592                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
593                            assert_eq!(
594                                Some(tx_id),
595                                provider.transaction_id(*transaction.tx_hash())?
596                            );
597                        }
598                    }
599                }
600                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
601            };
602            Ok(())
603        }
604    }
605
606    impl UnwindStageTestRunner for TransactionLookupTestRunner {
607        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
608            self.ensure_no_hash_by_block(input.unwind_to)
609        }
610    }
611
612    #[cfg(all(unix, feature = "rocksdb"))]
613    mod rocksdb_tests {
614        use super::*;
615        use reth_provider::RocksDBProviderFactory;
616        use reth_storage_api::StorageSettings;
617
618        /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
619        /// writes transaction hash mappings to `RocksDB` instead of MDBX.
620        #[tokio::test]
621        async fn execute_writes_to_rocksdb_when_enabled() {
622            let (previous_stage, stage_progress) = (110, 100);
623            let mut rng = generators::rng();
624
625            // Set up the runner
626            let runner = TransactionLookupTestRunner::default();
627
628            // Enable RocksDB for transaction hash numbers
629            runner.db.factory.set_storage_settings_cache(
630                StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
631            );
632
633            let input = ExecInput {
634                target: Some(previous_stage),
635                checkpoint: Some(StageCheckpoint::new(stage_progress)),
636            };
637
638            // Insert blocks with transactions
639            let blocks = random_block_range(
640                &mut rng,
641                stage_progress + 1..=previous_stage,
642                BlockRangeParams {
643                    parent: Some(B256::ZERO),
644                    tx_count: 1..3, // Ensure we have transactions
645                    ..Default::default()
646                },
647            );
648            runner
649                .db
650                .insert_blocks(blocks.iter(), StorageKind::Static)
651                .expect("failed to insert blocks");
652
653            // Count expected transactions
654            let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
655            assert!(expected_tx_count > 0, "test requires at least one transaction");
656
657            // Execute the stage
658            let rx = runner.execute(input);
659            let result = rx.await.unwrap();
660            assert!(result.is_ok(), "stage execution failed: {:?}", result);
661
662            // Verify MDBX table is empty (data should be in RocksDB)
663            let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
664            assert_eq!(
665                mdbx_count, 0,
666                "MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
667            );
668
669            // Verify RocksDB has the data
670            let rocksdb = runner.db.factory.rocksdb_provider();
671            let mut rocksdb_count = 0;
672            for block in &blocks {
673                for tx in &block.body().transactions {
674                    let hash = *tx.tx_hash();
675                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
676                    assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
677                    rocksdb_count += 1;
678                }
679            }
680            assert_eq!(
681                rocksdb_count, expected_tx_count,
682                "RocksDB should contain all transaction hashes"
683            );
684        }
685
686        /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
687        /// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
688        #[tokio::test]
689        async fn unwind_deletes_from_rocksdb_when_enabled() {
690            let (previous_stage, stage_progress) = (110, 100);
691            let mut rng = generators::rng();
692
693            // Set up the runner
694            let runner = TransactionLookupTestRunner::default();
695
696            // Enable RocksDB for transaction hash numbers
697            runner.db.factory.set_storage_settings_cache(
698                StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
699            );
700
701            // Insert blocks with transactions
702            let blocks = random_block_range(
703                &mut rng,
704                stage_progress + 1..=previous_stage,
705                BlockRangeParams {
706                    parent: Some(B256::ZERO),
707                    tx_count: 1..3, // Ensure we have transactions
708                    ..Default::default()
709                },
710            );
711            runner
712                .db
713                .insert_blocks(blocks.iter(), StorageKind::Static)
714                .expect("failed to insert blocks");
715
716            // Count expected transactions
717            let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
718            assert!(expected_tx_count > 0, "test requires at least one transaction");
719
720            // Execute the stage first to populate RocksDB
721            let exec_input = ExecInput {
722                target: Some(previous_stage),
723                checkpoint: Some(StageCheckpoint::new(stage_progress)),
724            };
725            let rx = runner.execute(exec_input);
726            let result = rx.await.unwrap();
727            assert!(result.is_ok(), "stage execution failed: {:?}", result);
728
729            // Verify RocksDB has the data before unwind
730            let rocksdb = runner.db.factory.rocksdb_provider();
731            for block in &blocks {
732                for tx in &block.body().transactions {
733                    let hash = *tx.tx_hash();
734                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
735                    assert!(
736                        result.is_some(),
737                        "Transaction hash {:?} should exist before unwind",
738                        hash
739                    );
740                }
741            }
742
743            // Now unwind to stage_progress (removing all the blocks we added)
744            let unwind_input = UnwindInput {
745                checkpoint: StageCheckpoint::new(previous_stage),
746                unwind_to: stage_progress,
747                bad_block: None,
748            };
749            let unwind_result = runner.unwind(unwind_input).await;
750            assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
751
752            // Verify RocksDB data is deleted after unwind
753            let rocksdb = runner.db.factory.rocksdb_provider();
754            for block in &blocks {
755                for tx in &block.body().transactions {
756                    let hash = *tx.tx_hash();
757                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
758                    assert!(
759                        result.is_none(),
760                        "Transaction hash {:?} should be deleted from RocksDB after unwind",
761                        hash
762                    );
763                }
764            }
765        }
766    }
767}