reth_stages/stages/
era.rs

1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{era1_file::Era1Reader, era_file_ops::StreamReader};
8use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
9use reth_era_utils as era;
10use reth_etl::Collector;
11use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
12use reth_provider::{
13    BlockReader, BlockWriter, DBProvider, StageCheckpointWriter, StaticFileProviderFactory,
14    StaticFileWriter,
15};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use reth_static_file_types::StaticFileSegment;
18use std::{
19    fmt::{Debug, Formatter},
20    iter,
21    path::Path,
22    task::{ready, Context, Poll},
23};
24
25type Item<Header, Body> =
26    Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
27type ThreadSafeEraStream<Header, Body> =
28    Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
29
30/// The [ERA1](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md)
31/// pre-merge history stage.
32///
33/// Imports block headers and bodies from genesis up to the last pre-merge block. Receipts are
34/// generated by execution. Execution is not done in this stage.
35pub struct EraStage<Header, Body, StreamFactory> {
36    /// The `source` creates `stream`.
37    source: Option<StreamFactory>,
38    /// A map of block hash to block height collected when processing headers and inserted into
39    /// database afterward.
40    hash_collector: Collector<BlockHash, BlockNumber>,
41    /// Last extracted iterator of block `Header` and `Body` pairs.
42    item: Option<Item<Header, Body>>,
43    /// A stream of [`Item`]s, i.e. iterators over block `Header` and `Body` pairs.
44    stream: Option<ThreadSafeEraStream<Header, Body>>,
45}
46
47trait EraStreamFactory<Header, Body> {
48    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
49}
50
51impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
52where
53    Header: FullBlockHeader + Value,
54    Body: FullBlockBody<OmmerHeader = Header>,
55{
56    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
57        match self {
58            Self::Path(path) => Self::convert(
59                read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
60            ),
61            Self::Url(url, folder) => {
62                let _ = reth_fs_util::create_dir_all(&folder);
63                let client = EraClient::new(Client::new(), url, folder);
64
65                Self::convert(EraStream::new(
66                    client,
67                    EraStreamConfig::default().start_from(input.next_block()),
68                ))
69            }
70        }
71    }
72}
73
74impl EraImportSource {
75    fn convert<Header, Body>(
76        stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
77            + Send
78            + Sync
79            + 'static
80            + Unpin,
81    ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
82    where
83        Header: FullBlockHeader + Value,
84        Body: FullBlockBody<OmmerHeader = Header>,
85    {
86        Ok(Box::new(Box::pin(stream.map(|meta| {
87            meta.and_then(|meta| {
88                let file = reth_fs_util::open(meta.path())?;
89                let reader = Era1Reader::new(file);
90                let iter = reader.iter();
91                let iter = iter.map(era::decode);
92                let iter = iter.chain(
93                    iter::once_with(move || match meta.mark_as_processed() {
94                        Ok(..) => None,
95                        Err(e) => Some(Err(e)),
96                    })
97                    .flatten(),
98                );
99
100                Ok(Box::new(iter) as Item<Header, Body>)
101            })
102        }))))
103    }
104}
105
106impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
107    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("EraStage")
109            .field("source", &self.source)
110            .field("hash_collector", &self.hash_collector)
111            .field("item", &self.item.is_some())
112            .field("stream", &"dyn Stream")
113            .finish()
114    }
115}
116
117impl<Header, Body, F> EraStage<Header, Body, F> {
118    /// Creates a new [`EraStage`].
119    pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
120        Self {
121            source,
122            item: None,
123            stream: None,
124            hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
125        }
126    }
127}
128
129impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
130where
131    Provider: DBProvider<Tx: DbTxMut>
132        + StaticFileProviderFactory<Primitives = N>
133        + BlockWriter<Block = N::Block>
134        + BlockReader<Block = N::Block>
135        + StageCheckpointWriter,
136    F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
137    N: NodePrimitives<BlockHeader: Value>,
138{
139    fn id(&self) -> StageId {
140        StageId::Era
141    }
142
143    fn poll_execute_ready(
144        &mut self,
145        cx: &mut Context<'_>,
146        input: ExecInput,
147    ) -> Poll<Result<(), StageError>> {
148        if input.target_reached() || self.item.is_some() {
149            return Poll::Ready(Ok(()));
150        }
151
152        if self.stream.is_none() &&
153            let Some(source) = self.source.clone()
154        {
155            self.stream.replace(source.create(input)?);
156        }
157        if let Some(stream) = &mut self.stream &&
158            let Some(next) = ready!(stream.poll_next_unpin(cx))
159                .transpose()
160                .map_err(|e| StageError::Fatal(e.into()))?
161        {
162            self.item.replace(next);
163        }
164
165        Poll::Ready(Ok(()))
166    }
167
168    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
169        let height = if let Some(era) = self.item.take() {
170            let static_file_provider = provider.static_file_provider();
171
172            // Consistency check of expected headers in static files vs DB is done on
173            // provider::sync_gap when poll_execute_ready is polled.
174            let last_header_number = static_file_provider
175                .get_highest_static_file_block(StaticFileSegment::Headers)
176                .unwrap_or_default();
177
178            // Although headers were downloaded in reverse order, the collector iterates it in
179            // ascending order
180            let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
181
182            let height = era::process_iter(
183                era,
184                &mut writer,
185                provider,
186                &mut self.hash_collector,
187                last_header_number..=input.target(),
188            )
189            .map_err(|e| StageError::Fatal(e.into()))?;
190
191            if !self.hash_collector.is_empty() {
192                era::build_index(provider, &mut self.hash_collector)
193                    .map_err(|e| StageError::Recoverable(e.into()))?;
194                self.hash_collector.clear();
195            }
196
197            era::save_stage_checkpoints(
198                &provider,
199                input.checkpoint().block_number,
200                height,
201                height,
202                input.target(),
203            )?;
204
205            height
206        } else {
207            // No era files to process. Return the highest block we're aware of to avoid
208            // limiting subsequent stages with an outdated checkpoint.
209            //
210            // This can happen when:
211            // 1. Era import is complete (all pre-merge blocks imported)
212            // 2. No era import source was configured
213            //
214            // We return max(checkpoint, highest_header, target) to ensure we don't return
215            // a stale checkpoint that could limit subsequent stages like Headers.
216            let highest_header = provider
217                .static_file_provider()
218                .get_highest_static_file_block(StaticFileSegment::Headers)
219                .unwrap_or_default();
220
221            let checkpoint = input.checkpoint().block_number;
222            let from_target = input.target.unwrap_or(checkpoint);
223
224            checkpoint.max(highest_header).max(from_target)
225        };
226
227        Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height >= input.target() })
228    }
229
230    fn unwind(
231        &mut self,
232        _provider: &Provider,
233        input: UnwindInput,
234    ) -> Result<UnwindOutput, StageError> {
235        Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
236    }
237}
238
239/// Describes where to get the era files from.
240#[derive(Debug, Clone)]
241pub enum EraImportSource {
242    /// Remote HTTP accessible host.
243    Url(Url, Box<Path>),
244    /// Local directory.
245    Path(Box<Path>),
246}
247
248impl EraImportSource {
249    /// Maybe constructs a new `EraImportSource` depending on the arguments.
250    ///
251    /// Only one of `url` or `path` should be provided, but upholding this invariant is delegated
252    /// above so that both parameters can be accepted.
253    ///
254    /// # Arguments
255    /// * The `path` uses a directory as the import source. It and its contents must be readable.
256    /// * The `url` uses an HTTP client to list and download files.
257    /// * The `default` gives the default [`Url`] if none of the previous parameters are provided.
258    /// * For any [`Url`] the `folder` is used as the download directory for storing files
259    ///   temporarily. It and its contents must be readable and writable.
260    pub fn maybe_new(
261        path: Option<Box<Path>>,
262        url: Option<Url>,
263        default: impl FnOnce() -> Option<Url>,
264        folder: impl FnOnce() -> Box<Path>,
265    ) -> Option<Self> {
266        path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::test_utils::{
274        stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
275    };
276    use alloy_primitives::B256;
277    use assert_matches::assert_matches;
278    use reth_db_api::tables;
279    use reth_provider::BlockHashReader;
280    use reth_testing_utils::generators::{self, random_header};
281    use test_runner::EraTestRunner;
282
283    #[tokio::test]
284    async fn test_era_range_ends_below_target() {
285        let era_cap = 2;
286        let target = 20000;
287
288        let mut runner = EraTestRunner::default();
289
290        let input = ExecInput { target: Some(era_cap), checkpoint: None };
291        runner.seed_execution(input).unwrap();
292
293        let input = ExecInput { target: Some(target), checkpoint: None };
294        let output = runner.execute(input).await.unwrap();
295
296        runner.commit();
297
298        assert_matches!(
299            output,
300            Ok(ExecOutput {
301                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
302                done: false
303            }) if block_number == era_cap
304        );
305
306        let output = output.unwrap();
307        let validation_output = runner.validate_execution(input, Some(output.clone()));
308
309        assert_matches!(validation_output, Ok(()));
310
311        runner.take_responses();
312
313        let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
314        let output = runner.execute(input).await.unwrap();
315
316        runner.commit();
317
318        assert_matches!(
319            output,
320            Ok(ExecOutput {
321                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
322                done: true
323            }) if block_number == target
324        );
325
326        let validation_output = runner.validate_execution(input, output.ok());
327
328        assert_matches!(validation_output, Ok(()));
329    }
330
331    mod test_runner {
332        use super::*;
333        use crate::test_utils::{TestRunnerError, TestStageDB};
334        use alloy_consensus::{BlockBody, Header};
335        use futures_util::stream;
336        use reth_db_api::{
337            cursor::DbCursorRO,
338            models::{StoredBlockBodyIndices, StoredBlockOmmers},
339            transaction::DbTx,
340        };
341        use reth_ethereum_primitives::TransactionSigned;
342        use reth_primitives_traits::{SealedBlock, SealedHeader};
343        use reth_provider::{BlockNumReader, HeaderProvider, TransactionsProvider};
344        use reth_testing_utils::generators::{
345            random_block_range, random_signed_tx, BlockRangeParams,
346        };
347        use tokio::sync::watch;
348
349        pub(crate) struct EraTestRunner {
350            channel: (watch::Sender<B256>, watch::Receiver<B256>),
351            db: TestStageDB,
352            responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
353        }
354
355        impl Default for EraTestRunner {
356            fn default() -> Self {
357                Self {
358                    channel: watch::channel(B256::ZERO),
359                    db: TestStageDB::default(),
360                    responses: Default::default(),
361                }
362            }
363        }
364
365        impl StageTestRunner for EraTestRunner {
366            type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
367
368            fn db(&self) -> &TestStageDB {
369                &self.db
370            }
371
372            fn stage(&self) -> Self::S {
373                EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
374            }
375        }
376
377        impl ExecuteStageTestRunner for EraTestRunner {
378            type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
379
380            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
381                let start = input.checkpoint().block_number;
382                let end = input.target();
383
384                let static_file_provider = self.db.factory.static_file_provider();
385
386                let mut rng = generators::rng();
387
388                // Static files do not support gaps in headers, so we need to generate 0 to end
389                let blocks = random_block_range(
390                    &mut rng,
391                    0..=end,
392                    BlockRangeParams {
393                        parent: Some(B256::ZERO),
394                        tx_count: 0..2,
395                        ..Default::default()
396                    },
397                );
398                self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
399                if let Some(progress) = blocks.get(start as usize) {
400                    // Insert last progress data
401                    {
402                        let tx = self.db.factory.provider_rw()?.into_tx();
403                        let mut static_file_producer = static_file_provider
404                            .get_writer(start, StaticFileSegment::Transactions)?;
405
406                        let body = StoredBlockBodyIndices {
407                            first_tx_num: 0,
408                            tx_count: progress.transaction_count() as u64,
409                        };
410
411                        static_file_producer.set_block_range(0..=progress.number);
412
413                        body.tx_num_range().try_for_each(|tx_num| {
414                            let transaction = random_signed_tx(&mut rng);
415                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
416                        })?;
417
418                        if body.tx_count != 0 {
419                            tx.put::<tables::TransactionBlocks>(
420                                body.last_tx_num(),
421                                progress.number,
422                            )?;
423                        }
424
425                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
426
427                        if !progress.ommers_hash_is_empty() {
428                            tx.put::<tables::BlockOmmers>(
429                                progress.number,
430                                StoredBlockOmmers { ommers: progress.body().ommers.clone() },
431                            )?;
432                        }
433
434                        static_file_producer.commit()?;
435                        tx.commit()?;
436                    }
437                }
438                self.responses.replace(
439                    blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
440                );
441                Ok(blocks)
442            }
443
444            /// Validate stored headers and bodies
445            fn validate_execution(
446                &self,
447                input: ExecInput,
448                output: Option<ExecOutput>,
449            ) -> Result<(), TestRunnerError> {
450                let initial_checkpoint = input.checkpoint().block_number;
451                match output {
452                    Some(output) if output.checkpoint.block_number > initial_checkpoint => {
453                        let provider = self.db.factory.provider()?;
454
455                        for block_num in initial_checkpoint..
456                            output
457                                .checkpoint
458                                .block_number
459                                .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
460                                    as BlockNumber)
461                        {
462                            // look up the header hash
463                            let hash = provider.block_hash(block_num)?.expect("no header hash");
464
465                            // validate the header number
466                            assert_eq!(provider.block_number(hash)?, Some(block_num));
467
468                            // validate the header
469                            let header = provider.header_by_number(block_num)?;
470                            assert!(header.is_some());
471                            let header = SealedHeader::seal_slow(header.unwrap());
472                            assert_eq!(header.hash(), hash);
473                        }
474
475                        self.validate_db_blocks(
476                            output.checkpoint.block_number,
477                            output.checkpoint.block_number,
478                        )?;
479                    }
480                    _ => self.check_no_header_entry_above(initial_checkpoint)?,
481                };
482                Ok(())
483            }
484
485            async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
486                let tip = if headers.is_empty() {
487                    let tip = random_header(&mut generators::rng(), 0, None);
488                    self.db.insert_headers(iter::once(&tip))?;
489                    tip.hash()
490                } else {
491                    headers.last().unwrap().hash()
492                };
493                self.send_tip(tip);
494                Ok(())
495            }
496        }
497
498        impl UnwindStageTestRunner for EraTestRunner {
499            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
500                Ok(())
501            }
502        }
503
504        impl EraTestRunner {
505            pub(crate) fn check_no_header_entry_above(
506                &self,
507                block: BlockNumber,
508            ) -> Result<(), TestRunnerError> {
509                self.db
510                    .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
511                self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
512                self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
513                Ok(())
514            }
515
516            pub(crate) fn send_tip(&self, tip: B256) {
517                self.channel.0.send(tip).expect("failed to send tip");
518            }
519
520            /// Validate that the inserted block data is valid
521            pub(crate) fn validate_db_blocks(
522                &self,
523                prev_progress: BlockNumber,
524                highest_block: BlockNumber,
525            ) -> Result<(), TestRunnerError> {
526                let static_file_provider = self.db.factory.static_file_provider();
527
528                self.db.query(|tx| {
529                    // Acquire cursors on body related tables
530                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
531                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
532                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
533
534                    let first_body_key = match bodies_cursor.first()? {
535                        Some((key, _)) => key,
536                        None => return Ok(()),
537                    };
538
539                    let mut prev_number: Option<BlockNumber> = None;
540
541
542                    for entry in bodies_cursor.walk(Some(first_body_key))? {
543                        let (number, body) = entry?;
544
545                        // Validate sequentiality only after prev progress,
546                        // since the data before is mocked and can contain gaps
547                        if number > prev_progress
548                            && let Some(prev_key) = prev_number {
549                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
550                            }
551
552                        // Validate that the current entry is below or equals to the highest allowed block
553                        assert!(
554                            number <= highest_block,
555                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
556                        );
557
558                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
559                        // Validate that ommers exist if any
560                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
561                        if header.ommers_hash_is_empty() {
562                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
563                        } else {
564                            assert!(stored_ommers.is_some(), "Missing ommers entry");
565                        }
566
567                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
568                        if body.tx_count == 0 {
569                            assert_ne!(tx_block_id,Some(number));
570                        } else {
571                            assert_eq!(tx_block_id, Some(number));
572                        }
573
574                        for tx_id in body.tx_num_range() {
575                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
576                        }
577
578                        prev_number = Some(number);
579                    }
580                    Ok(())
581                })?;
582                Ok(())
583            }
584
585            pub(crate) fn take_responses(&mut self) {
586                self.responses.take();
587            }
588
589            pub(crate) fn commit(&self) {
590                self.db.factory.static_file_provider().commit().unwrap();
591            }
592        }
593
594        #[derive(Clone)]
595        pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
596
597        impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
598            fn create(
599                self,
600                _input: ExecInput,
601            ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
602            {
603                let stream = stream::iter(vec![self.0]);
604
605                Ok(Box::new(Box::pin(stream.map(|meta| {
606                    Ok(Box::new(meta.into_iter().map(Ok))
607                        as Item<Header, BlockBody<TransactionSigned>>)
608                }))))
609            }
610        }
611    }
612
613    stage_test_suite!(EraTestRunner, era);
614}