Skip to main content

reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db_api::{
6    table::{Decode, Decompress, Value},
7    tables,
8    transaction::DbTxMut,
9    Tables,
10};
11use reth_etl::Collector;
12use reth_primitives_traits::{NodePrimitives, SignedTransaction};
13use reth_provider::{
14    BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
15    RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
16    TransactionsProvider, TransactionsProviderExt,
17};
18use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
19use reth_stages_api::{
20    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
21    UnwindInput, UnwindOutput,
22};
23use reth_storage_errors::provider::ProviderError;
24use tracing::*;
25
26/// The transaction lookup stage.
27///
28/// This stage walks over existing transactions, and sets the transaction hash of each transaction
29/// in a block to the corresponding `BlockNumber` at each block. This is written to the
30/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
31/// hash.
32///
33/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
34#[derive(Debug, Clone)]
35pub struct TransactionLookupStage {
36    /// The maximum number of lookup entries to hold in memory before pushing them to
37    /// [`reth_etl::Collector`].
38    chunk_size: u64,
39    etl_config: EtlConfig,
40    prune_mode: Option<PruneMode>,
41}
42
43impl Default for TransactionLookupStage {
44    fn default() -> Self {
45        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
46    }
47}
48
49impl TransactionLookupStage {
50    /// Create new instance of [`TransactionLookupStage`].
51    pub const fn new(
52        config: TransactionLookupConfig,
53        etl_config: EtlConfig,
54        prune_mode: Option<PruneMode>,
55    ) -> Self {
56        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
57    }
58}
59
60impl<Provider> Stage<Provider> for TransactionLookupStage
61where
62    Provider: DBProvider<Tx: DbTxMut>
63        + PruneCheckpointWriter
64        + BlockReader
65        + PruneCheckpointReader
66        + StatsReader
67        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
68        + TransactionsProviderExt
69        + StorageSettingsCache
70        + RocksDBProviderFactory,
71{
72    /// Return the id of the stage
73    fn id(&self) -> StageId {
74        StageId::TransactionLookup
75    }
76
77    /// Write transaction hash -> id entries
78    fn execute(
79        &mut self,
80        provider: &Provider,
81        mut input: ExecInput,
82    ) -> Result<ExecOutput, StageError> {
83        if let Some((target_prunable_block, prune_mode)) = self
84            .prune_mode
85            .map(|mode| {
86                mode.prune_target_block(
87                    input.target(),
88                    PruneSegment::TransactionLookup,
89                    PrunePurpose::User,
90                )
91            })
92            .transpose()?
93            .flatten() &&
94            target_prunable_block > input.checkpoint().block_number
95        {
96            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
97
98            // Save prune checkpoint only if we don't have one already.
99            // Otherwise, pruner may skip the unpruned range of blocks.
100            if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
101                let target_prunable_tx_number = provider
102                    .block_body_indices(target_prunable_block)?
103                    .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
104                    .last_tx_num();
105
106                provider.save_prune_checkpoint(
107                    PruneSegment::TransactionLookup,
108                    PruneCheckpoint {
109                        block_number: Some(target_prunable_block),
110                        tx_number: Some(target_prunable_tx_number),
111                        prune_mode,
112                    },
113                )?;
114            }
115        }
116        if input.target_reached() {
117            return Ok(ExecOutput::done(input.checkpoint()));
118        }
119
120        // 500MB temporary files
121        let mut hash_collector: Collector<TxHash, TxNumber> =
122            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
123
124        info!(
125            target: "sync::stages::transaction_lookup",
126            tx_range = ?input.checkpoint().block_number..=input.target(),
127            "Updating transaction lookup"
128        );
129
130        loop {
131            let Some(range_output) =
132                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?
133            else {
134                input.checkpoint = Some(
135                    StageCheckpoint::new(input.target())
136                        .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
137                );
138                break;
139            };
140
141            let end_block = *range_output.block_range.end();
142
143            info!(target: "sync::stages::transaction_lookup", tx_range = ?range_output.tx_range, "Calculating transaction hashes");
144
145            for (key, value) in provider.transaction_hashes_by_range(range_output.tx_range)? {
146                hash_collector.insert(key, value)?;
147            }
148
149            input.checkpoint = Some(
150                StageCheckpoint::new(end_block)
151                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
152            );
153
154            if range_output.is_final_range {
155                let total_hashes = hash_collector.len();
156                let interval = (total_hashes / 10).max(1);
157
158                // Use append mode when table is empty (first sync) - significantly faster
159                let append_only =
160                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
161
162                // Auto-commits on threshold; consistency check heals any crash.
163                provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
164                    let mut writer =
165                        EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
166
167                    let iter = hash_collector
168                        .iter()
169                        .map_err(|e| ProviderError::other(Box::new(e)))?;
170
171                    for (index, hash_to_number) in iter.enumerate() {
172                        let (hash_bytes, number_bytes) =
173                            hash_to_number.map_err(|e| ProviderError::other(Box::new(e)))?;
174                        if index > 0 && index.is_multiple_of(interval) {
175                            info!(
176                                target: "sync::stages::transaction_lookup",
177                                ?append_only,
178                                progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
179                                "Inserting hashes"
180                            );
181                        }
182
183                        // Decode from raw ETL bytes
184                        let hash = TxHash::decode(&hash_bytes)?;
185                        let tx_num = TxNumber::decompress(&number_bytes)?;
186                        writer.put_transaction_hash_number(hash, tx_num, append_only)?;
187                    }
188
189                    Ok(((), writer.into_raw_rocksdb_batch()))
190                })?;
191
192                trace!(target: "sync::stages::transaction_lookup",
193                    total_hashes,
194                    "Transaction hashes inserted"
195                );
196
197                break;
198            }
199        }
200
201        if provider.cached_storage_settings().storage_v2 {
202            provider.commit_pending_rocksdb_batches()?;
203            provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
204        }
205
206        Ok(ExecOutput {
207            checkpoint: StageCheckpoint::new(input.target())
208                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
209            done: true,
210        })
211    }
212
213    /// Unwind the stage.
214    fn unwind(
215        &mut self,
216        provider: &Provider,
217        input: UnwindInput,
218    ) -> Result<UnwindOutput, StageError> {
219        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
220
221        provider.with_rocksdb_batch(|rocksdb_batch| {
222            let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
223
224            let static_file_provider = provider.static_file_provider();
225            let rev_walker = provider
226                .block_body_indices_range(range.clone())?
227                .into_iter()
228                .rev()
229                .zip(range.rev());
230
231            for (body, number) in rev_walker {
232                if number <= unwind_to {
233                    break;
234                }
235
236                // Delete all transactions that belong to this block
237                for transaction in
238                    static_file_provider.transactions_by_tx_range(body.tx_num_range())?
239                {
240                    writer.delete_transaction_hash_number(transaction.trie_hash())?;
241                }
242            }
243
244            Ok(((), writer.into_raw_rocksdb_batch()))
245        })?;
246
247        Ok(UnwindOutput {
248            checkpoint: StageCheckpoint::new(unwind_to)
249                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
250        })
251    }
252}
253
254fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
255where
256    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
257{
258    let pruned_entries = provider
259        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
260        .and_then(|checkpoint| checkpoint.tx_number)
261        // `+1` is needed because `TxNumber` is 0-indexed
262        .map(|tx_number| tx_number + 1)
263        .unwrap_or_default();
264    Ok(EntitiesCheckpoint {
265        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
266        // matching the actual number of processed transactions. To fix that, we add the
267        // number of pruned `TransactionHashNumbers` entries.
268        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
269            pruned_entries,
270        // Count only static files entries. If we count the database entries too, we may have
271        // duplicates. We're sure that the static files have all entries that database has,
272        // because we run the `StaticFileProducer` before starting the pipeline.
273        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
274    })
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use crate::test_utils::{
281        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
282        TestRunnerError, TestStageDB, UnwindStageTestRunner,
283    };
284    use alloy_primitives::{BlockNumber, B256};
285    use assert_matches::assert_matches;
286    use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
287    use reth_ethereum_primitives::Block;
288    use reth_primitives_traits::SealedBlock;
289    use reth_provider::{
290        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
291    };
292    use reth_stages_api::StageUnitCheckpoint;
293    use reth_testing_utils::generators::{
294        self, random_block, random_block_range, BlockParams, BlockRangeParams,
295    };
296    use std::ops::Sub;
297
298    // Implement stage test suite.
299    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
300
301    #[tokio::test]
302    async fn execute_single_transaction_lookup() {
303        let (previous_stage, stage_progress) = (500, 100);
304        let mut rng = generators::rng();
305
306        // Set up the runner
307        let runner = TransactionLookupTestRunner::default();
308        let input = ExecInput {
309            target: Some(previous_stage),
310            checkpoint: Some(StageCheckpoint::new(stage_progress)),
311        };
312
313        // Insert blocks with a single transaction at block `stage_progress + 10`
314        let non_empty_block_number = stage_progress + 10;
315        let blocks = (stage_progress..=input.target())
316            .map(|number| {
317                random_block(
318                    &mut rng,
319                    number,
320                    BlockParams {
321                        tx_count: Some((number == non_empty_block_number) as u8),
322                        ..Default::default()
323                    },
324                )
325            })
326            .collect::<Vec<_>>();
327        runner
328            .db
329            .insert_blocks(blocks.iter(), StorageKind::Static)
330            .expect("failed to insert blocks");
331
332        let rx = runner.execute(input);
333
334        // Assert the successful result
335        let result = rx.await.unwrap();
336        assert_matches!(
337            result,
338            Ok(ExecOutput {
339                checkpoint: StageCheckpoint {
340                block_number,
341                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
342                    processed,
343                    total
344                }))
345            }, done: true }) if block_number == previous_stage && processed == total &&
346                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
347        );
348
349        // Validate the stage execution
350        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
351    }
352
353    #[tokio::test]
354    async fn execute_pruned_transaction_lookup() {
355        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
356        let mut rng = generators::rng();
357
358        // Set up the runner
359        let mut runner = TransactionLookupTestRunner::default();
360        let input = ExecInput {
361            target: Some(previous_stage),
362            checkpoint: Some(StageCheckpoint::new(stage_progress)),
363        };
364
365        // Seed only once with full input range
366        let seed = random_block_range(
367            &mut rng,
368            stage_progress + 1..=previous_stage,
369            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
370        );
371        runner
372            .db
373            .insert_blocks(seed.iter(), StorageKind::Static)
374            .expect("failed to seed execution");
375
376        runner.set_prune_mode(PruneMode::Before(prune_target));
377
378        let rx = runner.execute(input);
379
380        // Assert the successful result
381        let result = rx.await.unwrap();
382        assert_matches!(
383            result,
384            Ok(ExecOutput {
385                checkpoint: StageCheckpoint {
386                block_number,
387                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
388                    processed,
389                    total
390                }))
391            }, done: true }) if block_number == previous_stage && processed == total &&
392                total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
393        );
394
395        // Validate the stage execution
396        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
397    }
398
399    #[test]
400    fn stage_checkpoint_pruned() {
401        let db = TestStageDB::default();
402        let mut rng = generators::rng();
403
404        let blocks = random_block_range(
405            &mut rng,
406            0..=100,
407            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
408        );
409        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
410
411        let max_pruned_block = 30;
412        let max_processed_block = 70;
413
414        let mut tx_hash_numbers = Vec::new();
415        let mut tx_hash_number = 0;
416        for block in &blocks[..=max_processed_block] {
417            for transaction in &block.body().transactions {
418                if block.number > max_pruned_block {
419                    tx_hash_numbers.push((*transaction.tx_hash(), tx_hash_number));
420                }
421                tx_hash_number += 1;
422            }
423        }
424        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
425
426        let provider = db.factory.provider_rw().unwrap();
427        provider
428            .save_prune_checkpoint(
429                PruneSegment::TransactionLookup,
430                PruneCheckpoint {
431                    block_number: Some(max_pruned_block),
432                    tx_number: Some(
433                        blocks[..=max_pruned_block as usize]
434                            .iter()
435                            .map(|block| block.transaction_count() as u64)
436                            .sum::<u64>()
437                            .sub(1), // `TxNumber` is 0-indexed
438                    ),
439                    prune_mode: PruneMode::Full,
440                },
441            )
442            .expect("save stage checkpoint");
443        provider.commit().expect("commit");
444
445        let provider = db.factory.database_provider_rw().unwrap();
446        assert_eq!(
447            stage_checkpoint(&provider).expect("stage checkpoint"),
448            EntitiesCheckpoint {
449                processed: blocks[..=max_processed_block]
450                    .iter()
451                    .map(|block| block.transaction_count() as u64)
452                    .sum(),
453                total: blocks.iter().map(|block| block.transaction_count() as u64).sum()
454            }
455        );
456    }
457
458    struct TransactionLookupTestRunner {
459        db: TestStageDB,
460        chunk_size: u64,
461        etl_config: EtlConfig,
462        prune_mode: Option<PruneMode>,
463    }
464
465    impl Default for TransactionLookupTestRunner {
466        fn default() -> Self {
467            Self {
468                db: TestStageDB::default(),
469                chunk_size: 1000,
470                etl_config: EtlConfig::default(),
471                prune_mode: None,
472            }
473        }
474    }
475
476    impl TransactionLookupTestRunner {
477        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
478            self.prune_mode = Some(prune_mode);
479        }
480
481        /// # Panics
482        ///
483        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
484        ///    given block number.
485        /// 2. If there is no requested block entry in the bodies table, but
486        ///    [`tables::TransactionHashNumbers`] is    not empty.
487        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
488            let body_result = self
489                .db
490                .factory
491                .provider_rw()?
492                .block_body_indices(number)?
493                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
494            match body_result {
495                Ok(body) => {
496                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
497                        body.last_tx_num(),
498                        |key| key,
499                    )?
500                }
501                Err(_) => {
502                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
503                }
504            };
505
506            Ok(())
507        }
508    }
509
510    impl StageTestRunner for TransactionLookupTestRunner {
511        type S = TransactionLookupStage;
512
513        fn db(&self) -> &TestStageDB {
514            &self.db
515        }
516
517        fn stage(&self) -> Self::S {
518            TransactionLookupStage {
519                chunk_size: self.chunk_size,
520                etl_config: self.etl_config.clone(),
521                prune_mode: self.prune_mode,
522            }
523        }
524    }
525
526    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
527        type Seed = Vec<SealedBlock<Block>>;
528
529        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
530            let stage_progress = input.checkpoint().block_number;
531            let end = input.target();
532            let mut rng = generators::rng();
533
534            let blocks = random_block_range(
535                &mut rng,
536                stage_progress + 1..=end,
537                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
538            );
539            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
540            Ok(blocks)
541        }
542
543        fn validate_execution(
544            &self,
545            mut input: ExecInput,
546            output: Option<ExecOutput>,
547        ) -> Result<(), TestRunnerError> {
548            match output {
549                Some(output) => {
550                    let provider = self.db.factory.provider()?;
551
552                    if let Some((target_prunable_block, _)) = self
553                        .prune_mode
554                        .map(|mode| {
555                            mode.prune_target_block(
556                                input.target(),
557                                PruneSegment::TransactionLookup,
558                                PrunePurpose::User,
559                            )
560                        })
561                        .transpose()
562                        .expect("prune target block for transaction lookup")
563                        .flatten() &&
564                        target_prunable_block > input.checkpoint().block_number
565                    {
566                        input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
567                    }
568                    let start_block = input.next_block();
569                    let end_block = output.checkpoint.block_number;
570
571                    if start_block > end_block {
572                        return Ok(())
573                    }
574
575                    let mut body_cursor =
576                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
577                    body_cursor.seek_exact(start_block)?;
578
579                    while let Some((_, body)) = body_cursor.next()? {
580                        for tx_id in body.tx_num_range() {
581                            let transaction =
582                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
583                            assert_eq!(
584                                Some(tx_id),
585                                provider.transaction_id(*transaction.tx_hash())?
586                            );
587                        }
588                    }
589                }
590                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
591            };
592            Ok(())
593        }
594    }
595
596    impl UnwindStageTestRunner for TransactionLookupTestRunner {
597        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
598            self.ensure_no_hash_by_block(input.unwind_to)
599        }
600    }
601
602    mod rocksdb_tests {
603        use super::*;
604        use reth_provider::RocksDBProviderFactory;
605        use reth_storage_api::StorageSettings;
606
607        /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
608        /// writes transaction hash mappings to `RocksDB` instead of MDBX.
609        #[tokio::test]
610        async fn execute_writes_to_rocksdb_when_enabled() {
611            let (previous_stage, stage_progress) = (110, 100);
612            let mut rng = generators::rng();
613
614            // Set up the runner
615            let runner = TransactionLookupTestRunner::default();
616
617            // Enable RocksDB for transaction hash numbers
618            runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
619
620            let input = ExecInput {
621                target: Some(previous_stage),
622                checkpoint: Some(StageCheckpoint::new(stage_progress)),
623            };
624
625            // Insert blocks with transactions
626            let blocks = random_block_range(
627                &mut rng,
628                stage_progress + 1..=previous_stage,
629                BlockRangeParams {
630                    parent: Some(B256::ZERO),
631                    tx_count: 1..3, // Ensure we have transactions
632                    ..Default::default()
633                },
634            );
635            runner
636                .db
637                .insert_blocks(blocks.iter(), StorageKind::Static)
638                .expect("failed to insert blocks");
639
640            // Count expected transactions
641            let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
642            assert!(expected_tx_count > 0, "test requires at least one transaction");
643
644            // Execute the stage
645            let rx = runner.execute(input);
646            let result = rx.await.unwrap();
647            assert!(result.is_ok(), "stage execution failed: {:?}", result);
648
649            // Verify MDBX table is empty (data should be in RocksDB)
650            let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
651            assert_eq!(
652                mdbx_count, 0,
653                "MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
654            );
655
656            // Verify RocksDB has the data
657            let rocksdb = runner.db.factory.rocksdb_provider();
658            let mut rocksdb_count = 0;
659            for block in &blocks {
660                for tx in &block.body().transactions {
661                    let hash = *tx.tx_hash();
662                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
663                    assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
664                    rocksdb_count += 1;
665                }
666            }
667            assert_eq!(
668                rocksdb_count, expected_tx_count,
669                "RocksDB should contain all transaction hashes"
670            );
671        }
672
673        /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
674        /// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
675        #[tokio::test]
676        async fn unwind_deletes_from_rocksdb_when_enabled() {
677            let (previous_stage, stage_progress) = (110, 100);
678            let mut rng = generators::rng();
679
680            // Set up the runner
681            let runner = TransactionLookupTestRunner::default();
682
683            // Enable RocksDB for transaction hash numbers
684            runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
685
686            // Insert blocks with transactions
687            let blocks = random_block_range(
688                &mut rng,
689                stage_progress + 1..=previous_stage,
690                BlockRangeParams {
691                    parent: Some(B256::ZERO),
692                    tx_count: 1..3, // Ensure we have transactions
693                    ..Default::default()
694                },
695            );
696            runner
697                .db
698                .insert_blocks(blocks.iter(), StorageKind::Static)
699                .expect("failed to insert blocks");
700
701            // Count expected transactions
702            let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
703            assert!(expected_tx_count > 0, "test requires at least one transaction");
704
705            // Execute the stage first to populate RocksDB
706            let exec_input = ExecInput {
707                target: Some(previous_stage),
708                checkpoint: Some(StageCheckpoint::new(stage_progress)),
709            };
710            let rx = runner.execute(exec_input);
711            let result = rx.await.unwrap();
712            assert!(result.is_ok(), "stage execution failed: {:?}", result);
713
714            // Verify RocksDB has the data before unwind
715            let rocksdb = runner.db.factory.rocksdb_provider();
716            for block in &blocks {
717                for tx in &block.body().transactions {
718                    let hash = *tx.tx_hash();
719                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
720                    assert!(
721                        result.is_some(),
722                        "Transaction hash {:?} should exist before unwind",
723                        hash
724                    );
725                }
726            }
727
728            // Now unwind to stage_progress (removing all the blocks we added)
729            let unwind_input = UnwindInput {
730                checkpoint: StageCheckpoint::new(previous_stage),
731                unwind_to: stage_progress,
732                bad_block: None,
733            };
734            let unwind_result = runner.unwind(unwind_input).await;
735            assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
736
737            // Verify RocksDB data is deleted after unwind
738            let rocksdb = runner.db.factory.rocksdb_provider();
739            for block in &blocks {
740                for tx in &block.body().transactions {
741                    let hash = *tx.tx_hash();
742                    let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
743                    assert!(
744                        result.is_none(),
745                        "Transaction hash {:?} should be deleted from RocksDB after unwind",
746                        hash
747                    );
748                }
749            }
750        }
751    }
752}