Skip to main content

reth_stages/stages/
era.rs

1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{
8    common::file_ops::{EraFileType, StreamReader},
9    era1::file::Era1Reader,
10    ere::file::EreReader,
11};
12use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
13use reth_era_utils as era;
14use reth_etl::Collector;
15use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
16use reth_provider::{
17    BlockReader, BlockWriter, DBProvider, StageCheckpointWriter, StaticFileProviderFactory,
18    StaticFileWriter,
19};
20use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
21use reth_static_file_types::StaticFileSegment;
22use std::{
23    fmt::{Debug, Formatter},
24    iter,
25    path::Path,
26    task::{ready, Context, Poll},
27};
28
29type Item<Header, Body> =
30    Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
31type ThreadSafeEraStream<Header, Body> =
32    Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
33
34/// The ERA history stage.
35///
36/// Imports block headers and bodies from
37/// [ERA1](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md) files,
38/// which hold pre-merge execution blocks, and
39/// [ERE](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/ere.md) files,
40/// which hold both pre-merge and post-merge execution blocks. The reader is selected per file from
41/// its extension (`.era1` vs `.ere`/`.erae`).
42///
43/// Imports block headers and bodies up to the configured target block. Receipts are generated by
44/// execution, which is not done in this stage.
45pub struct EraStage<Header, Body, StreamFactory> {
46    /// The `source` creates `stream`.
47    source: Option<StreamFactory>,
48    /// A map of block hash to block height collected when processing headers and inserted into
49    /// database afterward.
50    hash_collector: Collector<BlockHash, BlockNumber>,
51    /// Last extracted iterator of block `Header` and `Body` pairs.
52    item: Option<Item<Header, Body>>,
53    /// A stream of [`Item`]s, i.e. iterators over block `Header` and `Body` pairs.
54    stream: Option<ThreadSafeEraStream<Header, Body>>,
55}
56
57trait EraStreamFactory<Header, Body> {
58    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
59}
60
61impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
62where
63    Header: FullBlockHeader + Value,
64    Body: FullBlockBody<OmmerHeader = Header>,
65{
66    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
67        match self {
68            Self::Path(path) => Self::convert(
69                read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
70            ),
71            Self::Url(url, folder) => {
72                let _ = reth_fs_util::create_dir_all(&folder);
73                let client = EraClient::new(Client::new(), url, folder);
74
75                Self::convert(EraStream::new(
76                    client,
77                    EraStreamConfig::default().start_from(input.next_block()),
78                ))
79            }
80        }
81    }
82}
83
84impl EraImportSource {
85    fn convert<Header, Body>(
86        stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
87            + Send
88            + Sync
89            + 'static
90            + Unpin,
91    ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
92    where
93        Header: FullBlockHeader + Value,
94        Body: FullBlockBody<OmmerHeader = Header>,
95    {
96        Ok(Box::new(Box::pin(stream.map(|meta| {
97            meta.and_then(|meta| {
98                // ERE files may be published as `.ere` or `.erae`; all other execution history
99                // files are decoded as era1.
100                let file = reth_fs_util::open(meta.path())?;
101                let iter = match meta
102                    .path()
103                    .file_name()
104                    .and_then(|name| name.to_str())
105                    .and_then(EraFileType::from_filename)
106                {
107                    Some(EraFileType::Ere) => {
108                        Box::new(EreReader::new(file).iter().map(era::Ere::decode))
109                            as Item<Header, Body>
110                    }
111                    _ => Box::new(Era1Reader::new(file).iter().map(era::decode))
112                        as Item<Header, Body>,
113                };
114
115                let iter = iter.chain(
116                    iter::once_with(move || match meta.mark_as_processed() {
117                        Ok(..) => None,
118                        Err(e) => Some(Err(e)),
119                    })
120                    .flatten(),
121                );
122
123                Ok(Box::new(iter) as Item<Header, Body>)
124            })
125        }))))
126    }
127}
128
129impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
130    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("EraStage")
132            .field("source", &self.source)
133            .field("hash_collector", &self.hash_collector)
134            .field("item", &self.item.is_some())
135            .field("stream", &"dyn Stream")
136            .finish()
137    }
138}
139
140impl<Header, Body, F> EraStage<Header, Body, F> {
141    /// Creates a new [`EraStage`].
142    pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
143        Self {
144            source,
145            item: None,
146            stream: None,
147            hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
148        }
149    }
150}
151
152impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
153where
154    Provider: DBProvider<Tx: DbTxMut>
155        + StaticFileProviderFactory<Primitives = N>
156        + BlockWriter<Block = N::Block>
157        + BlockReader<Block = N::Block>
158        + StageCheckpointWriter,
159    F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
160    N: NodePrimitives<BlockHeader: Value>,
161{
162    fn id(&self) -> StageId {
163        StageId::Era
164    }
165
166    fn poll_execute_ready(
167        &mut self,
168        cx: &mut Context<'_>,
169        input: ExecInput,
170    ) -> Poll<Result<(), StageError>> {
171        if input.target_reached() || self.item.is_some() {
172            return Poll::Ready(Ok(()));
173        }
174
175        if self.stream.is_none() &&
176            let Some(source) = self.source.clone()
177        {
178            self.stream.replace(source.create(input)?);
179        }
180        if let Some(stream) = &mut self.stream &&
181            let Some(next) = ready!(stream.poll_next_unpin(cx))
182                .transpose()
183                .map_err(|e| StageError::Fatal(e.into()))?
184        {
185            self.item.replace(next);
186        }
187
188        Poll::Ready(Ok(()))
189    }
190
191    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
192        let height = if let Some(era) = self.item.take() {
193            let static_file_provider = provider.static_file_provider();
194
195            // Consistency check of expected headers in static files vs DB is done on
196            // provider::sync_gap when poll_execute_ready is polled.
197            let last_header_number = static_file_provider
198                .get_highest_static_file_block(StaticFileSegment::Headers)
199                .unwrap_or_default();
200
201            // Although headers were downloaded in reverse order, the collector iterates it in
202            // ascending order
203            let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
204
205            let height = era::process_iter(
206                era,
207                &mut writer,
208                provider,
209                &mut self.hash_collector,
210                last_header_number..=input.target(),
211            )
212            .map_err(|e| StageError::Fatal(e.into()))?;
213
214            if !self.hash_collector.is_empty() {
215                era::build_index(provider, &mut self.hash_collector)
216                    .map_err(|e| StageError::Recoverable(e.into()))?;
217                self.hash_collector.clear();
218            }
219
220            era::save_stage_checkpoints(
221                provider,
222                input.checkpoint().block_number,
223                height,
224                height,
225                input.target(),
226            )?;
227
228            height
229        } else {
230            // No era files to process. Return the highest block we're aware of to avoid
231            // limiting subsequent stages with an outdated checkpoint.
232            //
233            // This can happen when:
234            // 1. Era import is complete (all pre-merge blocks imported)
235            // 2. No era import source was configured
236            //
237            // We return max(checkpoint, highest_header, target) to ensure we don't return
238            // a stale checkpoint that could limit subsequent stages like Headers.
239            let highest_header = provider
240                .static_file_provider()
241                .get_highest_static_file_block(StaticFileSegment::Headers)
242                .unwrap_or_default();
243
244            let checkpoint = input.checkpoint().block_number;
245            let from_target = input.target.unwrap_or(checkpoint);
246
247            checkpoint.max(highest_header).max(from_target)
248        };
249
250        Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height >= input.target() })
251    }
252
253    fn unwind(
254        &mut self,
255        _provider: &Provider,
256        input: UnwindInput,
257    ) -> Result<UnwindOutput, StageError> {
258        Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
259    }
260}
261
262/// Describes where to get the era files from.
263#[derive(Debug, Clone)]
264pub enum EraImportSource {
265    /// Remote HTTP accessible host.
266    Url(Url, Box<Path>),
267    /// Local directory.
268    Path(Box<Path>),
269}
270
271impl EraImportSource {
272    /// Maybe constructs a new `EraImportSource` depending on the arguments.
273    ///
274    /// Only one of `url` or `path` should be provided, but upholding this invariant is delegated
275    /// above so that both parameters can be accepted.
276    ///
277    /// # Arguments
278    /// * The `path` uses a directory as the import source. It and its contents must be readable.
279    /// * The `url` uses an HTTP client to list and download files.
280    /// * The `default` gives the default [`Url`] if none of the previous parameters are provided.
281    /// * For any [`Url`] the `folder` is used as the download directory for storing files
282    ///   temporarily. It and its contents must be readable and writable.
283    pub fn maybe_new(
284        path: Option<Box<Path>>,
285        url: Option<Url>,
286        default: impl FnOnce() -> Option<Url>,
287        folder: impl FnOnce() -> Box<Path>,
288    ) -> Option<Self> {
289        path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::test_utils::{
297        stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
298    };
299    use alloy_consensus::{BlockBody, Header};
300    use alloy_primitives::B256;
301    use assert_matches::assert_matches;
302    use futures_util::stream;
303    use reth_db_api::tables;
304    use reth_era::{
305        common::file_ops::{EraFileFormat, StreamWriter},
306        ere::{
307            file::{EreFile, EreWriter},
308            types::{
309                execution::{BlockTuple, CompressedBody, CompressedHeader},
310                group::{DynamicBlockIndex, EreGroup, EreId},
311            },
312        },
313    };
314    use reth_ethereum_primitives::{Block, TransactionSigned};
315    use reth_primitives_traits::SealedBlock;
316    use reth_provider::BlockHashReader;
317    use reth_testing_utils::generators::{
318        self, random_block_range, random_header, BlockRangeParams,
319    };
320    use std::{fs::File, path::PathBuf};
321    use test_runner::EraTestRunner;
322
323    #[tokio::test]
324    async fn test_era_range_ends_below_target() {
325        let era_cap = 2;
326        let target = 20000;
327
328        let mut runner = EraTestRunner::default();
329
330        let input = ExecInput { target: Some(era_cap), checkpoint: None };
331        runner.seed_execution(input).unwrap();
332
333        let input = ExecInput { target: Some(target), checkpoint: None };
334        let output = runner.execute(input).await.unwrap();
335
336        runner.commit();
337
338        assert_matches!(
339            output,
340            Ok(ExecOutput {
341                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
342                done: false
343            }) if block_number == era_cap
344        );
345
346        let output = output.unwrap();
347        let validation_output = runner.validate_execution(input, Some(output.clone()));
348
349        assert_matches!(validation_output, Ok(()));
350
351        runner.take_responses();
352
353        let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
354        let output = runner.execute(input).await.unwrap();
355
356        runner.commit();
357
358        assert_matches!(
359            output,
360            Ok(ExecOutput {
361                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
362                done: true
363            }) if block_number == target
364        );
365
366        let validation_output = runner.validate_execution(input, output.ok());
367
368        assert_matches!(validation_output, Ok(()));
369    }
370
371    /// Writes `blocks` to a header+body-only ERE file at `path`.
372    fn write_ere_file(path: &Path, blocks: &[SealedBlock<Block>]) {
373        let tuples = blocks
374            .iter()
375            .map(|block| {
376                BlockTuple::new(
377                    CompressedHeader::from_header(block.header()).unwrap(),
378                    CompressedBody::from_body(block.body()).unwrap(),
379                )
380            })
381            .collect::<Vec<_>>();
382
383        let start = blocks[0].number;
384        // Header + body only is component-count 2. The index offsets are stored verbatim and not
385        // interpreted when assembling blocks, so placeholder values are fine.
386        let component_count = 2;
387        let offsets = vec![0; blocks.len() * component_count as usize];
388        let index = DynamicBlockIndex::new(start, component_count, offsets);
389        let group = EreGroup::new(tuples, None, index);
390        let file = EreFile::new(group, EreId::new("mainnet", start, blocks.len() as u32));
391
392        EreWriter::new(File::create(path).unwrap()).write_file(&file).unwrap();
393    }
394
395    #[derive(Debug)]
396    struct EreTestMeta {
397        path: PathBuf,
398    }
399
400    impl EraMeta for EreTestMeta {
401        fn mark_as_processed(&self) -> eyre::Result<()> {
402            Ok(())
403        }
404
405        fn path(&self) -> &Path {
406            &self.path
407        }
408    }
409
410    #[tokio::test]
411    async fn convert_decodes_ere_files() {
412        let mut rng = generators::rng();
413        let blocks = random_block_range(
414            &mut rng,
415            1..=3,
416            BlockRangeParams { tx_count: 1..3, ..Default::default() },
417        );
418
419        let dir = tempfile::tempdir().unwrap();
420
421        // ERE files are published with either the `.ere` or `.erae` extension; both must be
422        // recognized and routed to the ERE reader rather than decoded as era1.
423        for ext in ["ere", "erae"] {
424            let path = dir.path().join(format!("mainnet-00000-abcd1234.{ext}"));
425            write_ere_file(&path, &blocks);
426
427            let stream =
428                stream::iter(vec![Ok::<_, eyre::Error>(EreTestMeta { path: path.clone() })]);
429            let mut stream =
430                EraImportSource::convert::<Header, BlockBody<TransactionSigned>>(stream).unwrap();
431
432            let item = stream.next().await.expect("a file to decode").expect("decoding to succeed");
433            let decoded = item.collect::<eyre::Result<Vec<_>>>().unwrap();
434
435            assert_eq!(
436                decoded.len(),
437                blocks.len(),
438                "ERE file with `.{ext}` extension should decode every block"
439            );
440            for ((header, body), block) in decoded.iter().zip(&blocks) {
441                assert_eq!(header, block.header());
442                assert_eq!(body, block.body());
443            }
444        }
445    }
446
447    mod test_runner {
448        use super::*;
449        use crate::test_utils::{TestRunnerError, TestStageDB};
450        use alloy_consensus::{BlockBody, Header};
451        use futures_util::stream;
452        use reth_db_api::{
453            cursor::DbCursorRO,
454            models::{StoredBlockBodyIndices, StoredBlockOmmers},
455            transaction::DbTx,
456        };
457        use reth_ethereum_primitives::TransactionSigned;
458        use reth_primitives_traits::{SealedBlock, SealedHeader};
459        use reth_provider::{BlockNumReader, HeaderProvider, TransactionsProvider};
460        use reth_testing_utils::generators::{
461            random_block_range, random_signed_tx, BlockRangeParams,
462        };
463        use tokio::sync::watch;
464
465        pub(crate) struct EraTestRunner {
466            channel: (watch::Sender<B256>, watch::Receiver<B256>),
467            db: TestStageDB,
468            responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
469        }
470
471        impl Default for EraTestRunner {
472            fn default() -> Self {
473                Self {
474                    channel: watch::channel(B256::ZERO),
475                    db: TestStageDB::default(),
476                    responses: Default::default(),
477                }
478            }
479        }
480
481        impl StageTestRunner for EraTestRunner {
482            type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
483
484            fn db(&self) -> &TestStageDB {
485                &self.db
486            }
487
488            fn stage(&self) -> Self::S {
489                EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
490            }
491        }
492
493        impl ExecuteStageTestRunner for EraTestRunner {
494            type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
495
496            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
497                let start = input.checkpoint().block_number;
498                let end = input.target();
499
500                let static_file_provider = self.db.factory.static_file_provider();
501
502                let mut rng = generators::rng();
503
504                // Static files do not support gaps in headers, so we need to generate 0 to end
505                let blocks = random_block_range(
506                    &mut rng,
507                    0..=end,
508                    BlockRangeParams {
509                        parent: Some(B256::ZERO),
510                        tx_count: 0..2,
511                        ..Default::default()
512                    },
513                );
514                self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
515                if let Some(progress) = blocks.get(start as usize) {
516                    // Insert last progress data
517                    {
518                        let tx = self.db.factory.provider_rw()?.into_tx();
519                        let mut static_file_producer = static_file_provider
520                            .get_writer(start, StaticFileSegment::Transactions)?;
521
522                        let body = StoredBlockBodyIndices {
523                            first_tx_num: 0,
524                            tx_count: progress.transaction_count() as u64,
525                        };
526
527                        static_file_producer.set_block_range(0..=progress.number);
528
529                        body.tx_num_range().try_for_each(|tx_num| {
530                            let transaction = random_signed_tx(&mut rng);
531                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
532                        })?;
533
534                        if body.tx_count != 0 {
535                            tx.put::<tables::TransactionBlocks>(
536                                body.last_tx_num(),
537                                progress.number,
538                            )?;
539                        }
540
541                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
542
543                        if !progress.ommers_hash_is_empty() {
544                            tx.put::<tables::BlockOmmers>(
545                                progress.number,
546                                StoredBlockOmmers { ommers: progress.body().ommers.clone() },
547                            )?;
548                        }
549
550                        static_file_producer.commit()?;
551                        tx.commit()?;
552                    }
553                }
554                self.responses.replace(
555                    blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
556                );
557                Ok(blocks)
558            }
559
560            /// Validate stored headers and bodies
561            fn validate_execution(
562                &self,
563                input: ExecInput,
564                output: Option<ExecOutput>,
565            ) -> Result<(), TestRunnerError> {
566                let initial_checkpoint = input.checkpoint().block_number;
567                match output {
568                    Some(output) if output.checkpoint.block_number > initial_checkpoint => {
569                        let provider = self.db.factory.provider()?;
570
571                        for block_num in initial_checkpoint..
572                            output
573                                .checkpoint
574                                .block_number
575                                .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
576                                    as BlockNumber)
577                        {
578                            // look up the header hash
579                            let hash = provider.block_hash(block_num)?.expect("no header hash");
580
581                            // validate the header number
582                            assert_eq!(provider.block_number(hash)?, Some(block_num));
583
584                            // validate the header
585                            let header = provider.header_by_number(block_num)?;
586                            assert!(header.is_some());
587                            let header = SealedHeader::seal_slow(header.unwrap());
588                            assert_eq!(header.hash(), hash);
589                        }
590
591                        self.validate_db_blocks(
592                            output.checkpoint.block_number,
593                            output.checkpoint.block_number,
594                        )?;
595                    }
596                    _ => self.check_no_header_entry_above(initial_checkpoint)?,
597                };
598                Ok(())
599            }
600
601            async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
602                let tip = if headers.is_empty() {
603                    let tip = random_header(&mut generators::rng(), 0, None);
604                    self.db.insert_headers(iter::once(&tip))?;
605                    tip.hash()
606                } else {
607                    headers.last().unwrap().hash()
608                };
609                self.send_tip(tip);
610                Ok(())
611            }
612        }
613
614        impl UnwindStageTestRunner for EraTestRunner {
615            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
616                Ok(())
617            }
618        }
619
620        impl EraTestRunner {
621            pub(crate) fn check_no_header_entry_above(
622                &self,
623                block: BlockNumber,
624            ) -> Result<(), TestRunnerError> {
625                self.db
626                    .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
627                self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
628                self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
629                Ok(())
630            }
631
632            pub(crate) fn send_tip(&self, tip: B256) {
633                self.channel.0.send(tip).expect("failed to send tip");
634            }
635
636            /// Validate that the inserted block data is valid
637            pub(crate) fn validate_db_blocks(
638                &self,
639                prev_progress: BlockNumber,
640                highest_block: BlockNumber,
641            ) -> Result<(), TestRunnerError> {
642                let static_file_provider = self.db.factory.static_file_provider();
643
644                self.db.query(|tx| {
645                    // Acquire cursors on body related tables
646                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
647                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
648                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
649
650                    let first_body_key = match bodies_cursor.first()? {
651                        Some((key, _)) => key,
652                        None => return Ok(()),
653                    };
654
655                    let mut prev_number: Option<BlockNumber> = None;
656
657
658                    for entry in bodies_cursor.walk(Some(first_body_key))? {
659                        let (number, body) = entry?;
660
661                        // Validate sequentiality only after prev progress,
662                        // since the data before is mocked and can contain gaps
663                        if number > prev_progress
664                            && let Some(prev_key) = prev_number {
665                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
666                            }
667
668                        // Validate that the current entry is below or equals to the highest allowed block
669                        assert!(
670                            number <= highest_block,
671                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
672                        );
673
674                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
675                        // Validate that ommers exist if any
676                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
677                        if header.ommers_hash_is_empty() {
678                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
679                        } else {
680                            assert!(stored_ommers.is_some(), "Missing ommers entry");
681                        }
682
683                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
684                        if body.tx_count == 0 {
685                            assert_ne!(tx_block_id,Some(number));
686                        } else {
687                            assert_eq!(tx_block_id, Some(number));
688                        }
689
690                        for tx_id in body.tx_num_range() {
691                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
692                        }
693
694                        prev_number = Some(number);
695                    }
696                    Ok(())
697                })?;
698                Ok(())
699            }
700
701            pub(crate) fn take_responses(&mut self) {
702                self.responses.take();
703            }
704
705            pub(crate) fn commit(&self) {
706                self.db.factory.static_file_provider().commit().unwrap();
707            }
708        }
709
710        #[derive(Clone)]
711        pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
712
713        impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
714            fn create(
715                self,
716                _input: ExecInput,
717            ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
718            {
719                let stream = stream::iter(vec![self.0]);
720
721                Ok(Box::new(Box::pin(stream.map(|meta| {
722                    Ok(Box::new(meta.into_iter().map(Ok))
723                        as Item<Header, BlockBody<TransactionSigned>>)
724                }))))
725            }
726        }
727    }
728
729    stage_test_suite!(EraTestRunner, era);
730}