Skip to main content

reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5#[cfg(all(unix, feature = "rocksdb"))]
6use reth_db_api::Tables;
7use reth_db_api::{
8    table::{Decode, Decompress, Value},
9    tables,
10    transaction::DbTxMut,
11};
12use reth_etl::Collector;
13use reth_primitives_traits::{NodePrimitives, SignedTransaction};
14use reth_provider::{
15    BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
16    RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
17    TransactionsProvider, TransactionsProviderExt,
18};
19use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
20use reth_stages_api::{
21    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
22    UnwindInput, UnwindOutput,
23};
24use reth_storage_errors::provider::ProviderError;
25use tracing::*;
26
27/// The transaction lookup stage.
28///
29/// This stage walks over existing transactions, and sets the transaction hash of each transaction
30/// in a block to the corresponding `BlockNumber` at each block. This is written to the
31/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
32/// hash.
33///
34/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
35#[derive(Debug, Clone)]
36pub struct TransactionLookupStage {
37    /// The maximum number of lookup entries to hold in memory before pushing them to
38    /// [`reth_etl::Collector`].
39    chunk_size: u64,
40    etl_config: EtlConfig,
41    prune_mode: Option<PruneMode>,
42}
43
44impl Default for TransactionLookupStage {
45    fn default() -> Self {
46        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
47    }
48}
49
50impl TransactionLookupStage {
51    /// Create new instance of [`TransactionLookupStage`].
52    pub const fn new(
53        config: TransactionLookupConfig,
54        etl_config: EtlConfig,
55        prune_mode: Option<PruneMode>,
56    ) -> Self {
57        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
58    }
59}
60
61impl<Provider> Stage<Provider> for TransactionLookupStage
62where
63    Provider: DBProvider<Tx: DbTxMut>
64        + PruneCheckpointWriter
65        + BlockReader
66        + PruneCheckpointReader
67        + StatsReader
68        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
69        + TransactionsProviderExt
70        + StorageSettingsCache
71        + RocksDBProviderFactory,
72{
73    /// Return the id of the stage
74    fn id(&self) -> StageId {
75        StageId::TransactionLookup
76    }
77
78    /// Write transaction hash -> id entries
79    fn execute(
80        &mut self,
81        provider: &Provider,
82        mut input: ExecInput,
83    ) -> Result<ExecOutput, StageError> {
84        if let Some((target_prunable_block, prune_mode)) = self
85            .prune_mode
86            .map(|mode| {
87                mode.prune_target_block(
88                    input.target(),
89                    PruneSegment::TransactionLookup,
90                    PrunePurpose::User,
91                )
92            })
93            .transpose()?
94            .flatten() &&
95            target_prunable_block > input.checkpoint().block_number
96        {
97            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
98
99            // Save prune checkpoint only if we don't have one already.
100            // Otherwise, pruner may skip the unpruned range of blocks.
101            if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
102                let target_prunable_tx_number = provider
103                    .block_body_indices(target_prunable_block)?
104                    .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
105                    .last_tx_num();
106
107                provider.save_prune_checkpoint(
108                    PruneSegment::TransactionLookup,
109                    PruneCheckpoint {
110                        block_number: Some(target_prunable_block),
111                        tx_number: Some(target_prunable_tx_number),
112                        prune_mode,
113                    },
114                )?;
115            }
116        }
117        if input.target_reached() {
118            return Ok(ExecOutput::done(input.checkpoint()));
119        }
120
121        // 500MB temporary files
122        let mut hash_collector: Collector<TxHash, TxNumber> =
123            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
124
125        info!(
126            target: "sync::stages::transaction_lookup",
127            tx_range = ?input.checkpoint().block_number..=input.target(),
128            "Updating transaction lookup"
129        );
130
131        loop {
132            let Some(range_output) =
133                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
134            else {
135                input.checkpoint = Some(
136                    StageCheckpoint::new(input.target())
137                        .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
138                );
139                break;
140            };
141
142            let end_block = *range_output.block_range.end();
143
144            info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
145
146            for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
147                hash_collector.insert(key, value)?;
148            }
149
150            input.checkpoint = Some(
151                StageCheckpoint::new(end_block)
152                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
153            );
154
155            if range_output.is_final_range {
156                let total_hashes = hash_collector.len();
157                let interval = (total_hashes / 10).max(1);
158
159                // Use append mode when table is empty (first sync) - significantly faster
160                let append_only =
161                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
162
163                // Auto-commits on threshold; consistency check heals any crash.
164                provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
165                    let mut writer =
166                        EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
167
168                    let iter = hash_collector
169                        .iter()
170                        .map_err(|e| ProviderError::other(Box::new(e)))?;
171
172                    for (index, hash_to_number) in iter.enumerate() {
173                        let (hash_bytes, number_bytes) =
174                            hash_to_number.map_err(|e| ProviderError::other(Box::new(e)))?;
175                        if index > 0 && index.is_multiple_of(interval) {
176                            info!(
177                                target: "sync::stages::transaction_lookup",
178                                ?append_only,
179                                progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
180                                "Inserting hashes"
181                            );
182                        }
183
184                        // Decode from raw ETL bytes
185                        let hash = TxHash::decode(&hash_bytes)?;
186                        let tx_num = TxNumber::decompress(&number_bytes)?;
187                        writer.put_transaction_hash_number(hash, tx_num, append_only)?;
188                    }
189
190                    Ok(((), writer.into_raw_rocksdb_batch()))
191                })?;
192
193                trace!(target: "sync::stages::transaction_lookup",
194                    total_hashes,
195                    "Transaction hashes inserted"
196                );
197
198                break;
199            }
200        }
201
202        #[cfg(all(unix, feature = "rocksdb"))]
203        if provider.cached_storage_settings().storage_v2 {
204            provider.commit_pending_rocksdb_batches()?;
205            provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
206        }
207
208        Ok(ExecOutput {
209            checkpoint: StageCheckpoint::new(input.target())
210                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
211            done: true,
212        })
213    }
214
215    /// Unwind the stage.
216    fn unwind(
217        &mut self,
218        provider: &Provider,
219        input: UnwindInput,
220    ) -> Result<UnwindOutput, StageError> {
221        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
222
223        provider.with_rocksdb_batch(|rocksdb_batch| {
224            let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
225
226            let static_file_provider = provider.static_file_provider();
227            let rev_walker = provider
228                .block_body_indices_range(range.clone())?
229                .into_iter()
230                .rev()
231                .zip(range.rev());
232
233            for (body, number) in rev_walker {
234                if number <= unwind_to {
235                    break;
236                }
237
238                // Delete all transactions that belong to this block
239                for transaction in
240                    static_file_provider.transactions_by_tx_range(body.tx_num_range())?
241                {
242                    writer.delete_transaction_hash_number(transaction.trie_hash())?;
243                }
244            }
245
246            Ok(((), writer.into_raw_rocksdb_batch()))
247        })?;
248
249        Ok(UnwindOutput {
250            checkpoint: StageCheckpoint::new(unwind_to)
251                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
252        })
253    }
254}
255
256fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
257where
258    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
259{
260    let pruned_entries = provider
261        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
262        .and_then(|checkpoint| checkpoint.tx_number)
263        // `+1` is needed because `TxNumber` is 0-indexed
264        .map(|tx_number| tx_number + 1)
265        .unwrap_or_default();
266    Ok(EntitiesCheckpoint {
267        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
268        // matching the actual number of processed transactions. To fix that, we add the
269        // number of pruned `TransactionHashNumbers` entries.
270        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
271            pruned_entries,
272        // Count only static files entries. If we count the database entries too, we may have
273        // duplicates. We're sure that the static files have all entries that database has,
274        // because we run the `StaticFileProducer` before starting the pipeline.
275        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
276    })
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::test_utils::{
283        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
284        TestRunnerError, TestStageDB, UnwindStageTestRunner,
285    };
286    use alloy_primitives::{BlockNumber, B256};
287    use assert_matches::assert_matches;
288    use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
289    use reth_ethereum_primitives::Block;
290    use reth_primitives_traits::SealedBlock;
291    use reth_provider::{
292        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
293    };
294    use reth_stages_api::StageUnitCheckpoint;
295    use reth_testing_utils::generators::{
296        self, random_block, random_block_range, BlockParams, BlockRangeParams,
297    };
298    use std::ops::Sub;
299
300    // Implement stage test suite.
301    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
302
303    #[tokio::test]
304    async fn execute_single_transaction_lookup() {
305        let (previous_stage, stage_progress) = (500, 100);
306        let mut rng = generators::rng();
307
308        // Set up the runner
309        let runner = TransactionLookupTestRunner::default();
310        let input = ExecInput {
311            target: Some(previous_stage),
312            checkpoint: Some(StageCheckpoint::new(stage_progress)),
313        };
314
315        // Insert blocks with a single transaction at block `stage_progress + 10`
316        let non_empty_block_number = stage_progress + 10;
317        let blocks = (stage_progress..=input.target())
318            .map(|number| {
319                random_block(
320                    &mut rng,
321                    number,
322                    BlockParams {
323                        tx_count: Some((number == non_empty_block_number) as u8),
324                        ..Default::default()
325                    },
326                )
327            })
328            .collect::<Vec<_>>();
329        runner
330            .db
331            .insert_blocks(blocks.iter(), StorageKind::Static)
332            .expect("failed to insert blocks");
333
334        let rx = runner.execute(input);
335
336        // Assert the successful result
337        let result = rx.await.unwrap();
338        assert_matches!(
339            result,
340            Ok(ExecOutput {
341                checkpoint: StageCheckpoint {
342                block_number,
343                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
344                    processed,
345                    total
346                }))
347            }, done: true }) if block_number == previous_stage && processed == total &&
348                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
349        );
350
351        // Validate the stage execution
352        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
353    }
354
355    #[tokio::test]
356    async fn execute_pruned_transaction_lookup() {
357        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
358        let mut rng = generators::rng();
359
360        // Set up the runner
361        let mut runner = TransactionLookupTestRunner::default();
362        let input = ExecInput {
363            target: Some(previous_stage),
364            checkpoint: Some(StageCheckpoint::new(stage_progress)),
365        };
366
367        // Seed only once with full input range
368        let seed = random_block_range(
369            &mut rng,
370            stage_progress + 1..=previous_stage,
371            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
372        );
373        runner
374            .db
375            .insert_blocks(seed.iter(), StorageKind::Static)
376            .expect("failed to seed execution");
377
378        runner.set_prune_mode(PruneMode::Before(prune_target));
379
380        let rx = runner.execute(input);
381
382        // Assert the successful result
383        let result = rx.await.unwrap();
384        assert_matches!(
385            result,
386            Ok(ExecOutput {
387                checkpoint: StageCheckpoint {
388                block_number,
389                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
390                    processed,
391                    total
392                }))
393            }, done: true }) if block_number == previous_stage && processed == total &&
394                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
395        );
396
397        // Validate the stage execution
398        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
399    }
400
401    #[test]
402    fn stage_checkpoint_pruned() {
403        let db = TestStageDB::default();
404        let mut rng = generators::rng();
405
406        let blocks = random_block_range(
407            &mut rng,
408            0..=100,
409            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
410        );
411        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
412
413        let max_pruned_block = 30;
414        let max_processed_block = 70;
415
416        let mut tx_hash_numbers = Vec::new();
417        let mut tx_hash_number = 0;
418        for block in &blocks[..=max_processed_block] {
419            for transaction in &block.body().transactions {
420                if block.number > max_pruned_block {
421                    tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
422                }
423                tx_hash_number += 1;
424            }
425        }
426        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
427
428        let provider = db.factory.provider_rw().unwrap();
429        provider
430            .save_prune_checkpoint(
431                PruneSegment::TransactionLookup,
432                PruneCheckpoint {
433                    block_number: Some(max_pruned_block),
434                    tx_number: Some(
435                        blocks[..=max_pruned_block as usize]
436                            .iter()
437                            .map(|block| block.transaction_count() as u64)
438                            .sum::<u64>()
439                            .sub(1), // `TxNumber` is 0-indexed
440                    ),
441                    prune_mode: PruneMode::Full,
442                },
443            )
444            .expect("save stage checkpoint");
445        provider.commit().expect("commit");
446
447        let provider = db.factory.database_provider_rw().unwrap();
448        assert_eq!(
449            stage_checkpoint(&provider).expect("stage checkpoint"),
450            EntitiesCheckpoint {
451                processed: blocks[..=max_processed_block]
452                    .iter()
453                    .map(|block| block.transaction_count() as u64)
454                    .sum(),
455                total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
456            }
457        );
458    }
459
460    struct TransactionLookupTestRunner {
461        db: TestStageDB,
462        chunk_size: u64,
463        etl_config: EtlConfig,
464        prune_mode: Option<PruneMode>,
465    }
466
467    impl Default for TransactionLookupTestRunner {
468        fn default() -> Self {
469            Self {
470                db: TestStageDB::default(),
471                chunk_size: 1000,
472                etl_config: EtlConfig::default(),
473                prune_mode: None,
474            }
475        }
476    }
477
478    impl TransactionLookupTestRunner {
479        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
480            self.prune_mode = Some(prune_mode);
481        }
482
483        /// # Panics
484        ///
485        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
486        ///    given block number.
487        /// 2. If there is no requested block entry in the bodies table, but
488        ///    [`tables::TransactionHashNumbers`] is    not empty.
489        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
490            let body_result = self
491                .db
492                .factory
493                .provider_rw()?
494                .block_body_indices(number)?
495                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
496            match body_result {
497                Ok(body) => {
498                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
499                        body.last_tx_num(),
500                        |key| key,
501                    )?
502                }
503                Err(_) => {
504                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
505                }
506            };
507
508            Ok(())
509        }
510    }
511
512    impl StageTestRunner for TransactionLookupTestRunner {
513        type S = TransactionLookupStage;
514
515        fn db(&self) -> &TestStageDB {
516            &self.db
517        }
518
519        fn stage(&self) -> Self::S {
520            TransactionLookupStage {
521                chunk_size: self.chunk_size,
522                etl_config: self.etl_config.clone(),
523                prune_mode: self.prune_mode,
524            }
525        }
526    }
527
528    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
529        type Seed = Vec<SealedBlock<Block>>;
530
531        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
532            let stage_progress = input.checkpoint().block_number;
533            let end = input.target();
534            let mut rng = generators::rng();
535
536            let blocks = random_block_range(
537                &mut rng,
538                stage_progress + 1..=end,
539                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
540            );
541            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
542            Ok(blocks)
543        }
544
545        fn validate_execution(
546            &self,
547            mut input: ExecInput,
548            output: Option<ExecOutput>,
549        ) -> Result<(), TestRunnerError> {
550            match output {
551                Some(output) => {
552                    let provider = self.db.factory.provider()?;
553
554                    if let Some((target_prunable_block, _)) = self
555                        .prune_mode
556                        .map(|mode| {
557                            mode.prune_target_block(
558                                input.target(),
559                                PruneSegment::TransactionLookup,
560                                PrunePurpose::User,
561                            )
562                        })
563                        .transpose()
564                        .expect("prune target block for transaction lookup")
565                        .flatten() &&
566                        target_prunable_block > input.checkpoint().block_number
567                    {
568                        input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
569                    }
570                    let start_block = input.next_block();
571                    let end_block = output.checkpoint.block_number;
572
573                    if start_block > end_block {
574                        return Ok(())
575                    }
576
577                    let mut body_cursor =
578                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
579                    body_cursor.seek_exact(start_block)?;
580
581                    while let Some((_, body)) = body_cursor.next()? {
582                        for tx_id in body.tx_num_range() {
583                            let transaction =
584                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
585                            assert_eq!(
586                                Some(tx_id),
587                                provider.transaction_id(*transaction.tx_hash())?
588                            );
589                        }
590                    }
591                }
592                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
593            };
594            Ok(())
595        }
596    }
597
598    impl UnwindStageTestRunner for TransactionLookupTestRunner {
599        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
600            self.ensure_no_hash_by_block(input.unwind_to)
601        }
602    }
603
604    #[cfg(all(unix, feature = "rocksdb"))]
605    mod rocksdb_tests {
606        use super::*;
607        use reth_provider::RocksDBProviderFactory;
608        use reth_storage_api::StorageSettings;
609
610        /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
611        /// writes transaction hash mappings to `RocksDB` instead of MDBX.
612        #[tokio::test]
613        async fn execute_writes_to_rocksdb_when_enabled() {
614            let (previous_stage, stage_progress) = (110, 100);
615            let mut rng = generators::rng();
616
617            // Set up the runner
618            let runner = TransactionLookupTestRunner::default();
619
620            // Enable RocksDB for transaction hash numbers
621            runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
622
623            let input = ExecInput {
624                target: Some(previous_stage),
625                checkpoint: Some(StageCheckpoint::new(stage_progress)),
626            };
627
628            // Insert blocks with transactions
629            let blocks = random_block_range(
630                &mut rng,
631                stage_progress + 1..=previous_stage,
632                BlockRangeParams {
633                    parent: Some(B256::ZERO),
634                    tx_count: 1..3, // Ensure we have transactions
635                    ..Default::default()
636                },
637            );
638            runner
639                .db
640                .insert_blocks(blocks.iter(), StorageKind::Static)
641                .expect("failed to insert blocks");
642
643            // Count expected transactions
644            let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
645            assert!(expected_tx_count > 0, "test requires at least one transaction");
646
647            // Execute the stage
648            let rx = runner.execute(input);
649            let result = rx.await.unwrap();
650            assert!(result.is_ok(), "stage execution failed: {:?}", result);
651
652            // Verify MDBX table is empty (data should be in RocksDB)
653            let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
654            assert_eq!(
655                mdbx_count, 0,
656                "MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
657            );
658
659            // Verify RocksDB has the data
660            let rocksdb = runner.db.factory.rocksdb_provider();
661            let mut rocksdb_count = 0;
662            for block in &blocks {
663                for tx in &block.body().transactions {
664                    let hash = *tx.tx_hash();
665                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
666                    assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
667                    rocksdb_count += 1;
668                }
669            }
670            assert_eq!(
671                rocksdb_count, expected_tx_count,
672                "RocksDB should contain all transaction hashes"
673            );
674        }
675
676        /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
677        /// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
678        #[tokio::test]
679        async fn unwind_deletes_from_rocksdb_when_enabled() {
680            let (previous_stage, stage_progress) = (110, 100);
681            let mut rng = generators::rng();
682
683            // Set up the runner
684            let runner = TransactionLookupTestRunner::default();
685
686            // Enable RocksDB for transaction hash numbers
687            runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
688
689            // Insert blocks with transactions
690            let blocks = random_block_range(
691                &mut rng,
692                stage_progress + 1..=previous_stage,
693                BlockRangeParams {
694                    parent: Some(B256::ZERO),
695                    tx_count: 1..3, // Ensure we have transactions
696                    ..Default::default()
697                },
698            );
699            runner
700                .db
701                .insert_blocks(blocks.iter(), StorageKind::Static)
702                .expect("failed to insert blocks");
703
704            // Count expected transactions
705            let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
706            assert!(expected_tx_count > 0, "test requires at least one transaction");
707
708            // Execute the stage first to populate RocksDB
709            let exec_input = ExecInput {
710                target: Some(previous_stage),
711                checkpoint: Some(StageCheckpoint::new(stage_progress)),
712            };
713            let rx = runner.execute(exec_input);
714            let result = rx.await.unwrap();
715            assert!(result.is_ok(), "stage execution failed: {:?}", result);
716
717            // Verify RocksDB has the data before unwind
718            let rocksdb = runner.db.factory.rocksdb_provider();
719            for block in &blocks {
720                for tx in &block.body().transactions {
721                    let hash = *tx.tx_hash();
722                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
723                    assert!(
724                        result.is_some(),
725                        "Transaction hash {:?} should exist before unwind",
726                        hash
727                    );
728                }
729            }
730
731            // Now unwind to stage_progress (removing all the blocks we added)
732            let unwind_input = UnwindInput {
733                checkpoint: StageCheckpoint::new(previous_stage),
734                unwind_to: stage_progress,
735                bad_block: None,
736            };
737            let unwind_result = runner.unwind(unwind_input).await;
738            assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
739
740            // Verify RocksDB data is deleted after unwind
741            let rocksdb = runner.db.factory.rocksdb_provider();
742            for block in &blocks {
743                for tx in &block.body().transactions {
744                    let hash = *tx.tx_hash();
745                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
746                    assert!(
747                        result.is_none(),
748                        "Transaction hash {:?} should be deleted from RocksDB after unwind",
749                        hash
750                    );
751                }
752            }
753        }
754    }
755}