reth_stages/stages/
era.rs

1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{era1_file::Era1Reader, era_file_ops::StreamReader};
8use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
9use reth_era_utils as era;
10use reth_etl::Collector;
11use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
12use reth_provider::{
13    BlockReader, BlockWriter, DBProvider, HeaderProvider, StageCheckpointWriter,
14    StaticFileProviderFactory, StaticFileWriter,
15};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_errors::ProviderError;
19use std::{
20    fmt::{Debug, Formatter},
21    iter,
22    path::Path,
23    task::{ready, Context, Poll},
24};
25
26type Item<Header, Body> =
27    Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
28type ThreadSafeEraStream<Header, Body> =
29    Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
30
31/// The [ERA1](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md)
32/// pre-merge history stage.
33///
34/// Imports block headers and bodies from genesis up to the last pre-merge block. Receipts are
35/// generated by execution. Execution is not done in this stage.
36pub struct EraStage<Header, Body, StreamFactory> {
37    /// The `source` creates `stream`.
38    source: Option<StreamFactory>,
39    /// A map of block hash to block height collected when processing headers and inserted into
40    /// database afterward.
41    hash_collector: Collector<BlockHash, BlockNumber>,
42    /// Last extracted iterator of block `Header` and `Body` pairs.
43    item: Option<Item<Header, Body>>,
44    /// A stream of [`Item`]s, i.e. iterators over block `Header` and `Body` pairs.
45    stream: Option<ThreadSafeEraStream<Header, Body>>,
46}
47
48trait EraStreamFactory<Header, Body> {
49    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
50}
51
52impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
53where
54    Header: FullBlockHeader + Value,
55    Body: FullBlockBody<OmmerHeader = Header>,
56{
57    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
58        match self {
59            Self::Path(path) => Self::convert(
60                read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
61            ),
62            Self::Url(url, folder) => {
63                let _ = reth_fs_util::create_dir_all(&folder);
64                let client = EraClient::new(Client::new(), url, folder);
65
66                Self::convert(EraStream::new(
67                    client,
68                    EraStreamConfig::default().start_from(input.next_block()),
69                ))
70            }
71        }
72    }
73}
74
75impl EraImportSource {
76    fn convert<Header, Body>(
77        stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
78            + Send
79            + Sync
80            + 'static
81            + Unpin,
82    ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
83    where
84        Header: FullBlockHeader + Value,
85        Body: FullBlockBody<OmmerHeader = Header>,
86    {
87        Ok(Box::new(Box::pin(stream.map(|meta| {
88            meta.and_then(|meta| {
89                let file = reth_fs_util::open(meta.path())?;
90                let reader = Era1Reader::new(file);
91                let iter = reader.iter();
92                let iter = iter.map(era::decode);
93                let iter = iter.chain(
94                    iter::once_with(move || match meta.mark_as_processed() {
95                        Ok(..) => None,
96                        Err(e) => Some(Err(e)),
97                    })
98                    .flatten(),
99                );
100
101                Ok(Box::new(iter) as Item<Header, Body>)
102            })
103        }))))
104    }
105}
106
107impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("EraStage")
110            .field("source", &self.source)
111            .field("hash_collector", &self.hash_collector)
112            .field("item", &self.item.is_some())
113            .field("stream", &"dyn Stream")
114            .finish()
115    }
116}
117
118impl<Header, Body, F> EraStage<Header, Body, F> {
119    /// Creates a new [`EraStage`].
120    pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
121        Self {
122            source,
123            item: None,
124            stream: None,
125            hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
126        }
127    }
128}
129
130impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
131where
132    Provider: DBProvider<Tx: DbTxMut>
133        + StaticFileProviderFactory<Primitives = N>
134        + BlockWriter<Block = N::Block>
135        + BlockReader<Block = N::Block>
136        + StageCheckpointWriter,
137    F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
138    N: NodePrimitives<BlockHeader: Value>,
139{
140    fn id(&self) -> StageId {
141        StageId::Era
142    }
143
144    fn poll_execute_ready(
145        &mut self,
146        cx: &mut Context<'_>,
147        input: ExecInput,
148    ) -> Poll<Result<(), StageError>> {
149        if input.target_reached() || self.item.is_some() {
150            return Poll::Ready(Ok(()));
151        }
152
153        if self.stream.is_none() &&
154            let Some(source) = self.source.clone()
155        {
156            self.stream.replace(source.create(input)?);
157        }
158        if let Some(stream) = &mut self.stream &&
159            let Some(next) = ready!(stream.poll_next_unpin(cx))
160                .transpose()
161                .map_err(|e| StageError::Fatal(e.into()))?
162        {
163            self.item.replace(next);
164        }
165
166        Poll::Ready(Ok(()))
167    }
168
169    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
170        let height = if let Some(era) = self.item.take() {
171            let static_file_provider = provider.static_file_provider();
172
173            // Consistency check of expected headers in static files vs DB is done on
174            // provider::sync_gap when poll_execute_ready is polled.
175            let last_header_number = static_file_provider
176                .get_highest_static_file_block(StaticFileSegment::Headers)
177                .unwrap_or_default();
178
179            // Find the latest total difficulty
180            let mut td = static_file_provider
181                .header_td_by_number(last_header_number)?
182                .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
183
184            // Although headers were downloaded in reverse order, the collector iterates it in
185            // ascending order
186            let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
187
188            let height = era::process_iter(
189                era,
190                &mut writer,
191                provider,
192                &mut self.hash_collector,
193                &mut td,
194                last_header_number..=input.target(),
195            )
196            .map_err(|e| StageError::Fatal(e.into()))?;
197
198            if !self.hash_collector.is_empty() {
199                era::build_index(provider, &mut self.hash_collector)
200                    .map_err(|e| StageError::Recoverable(e.into()))?;
201                self.hash_collector.clear();
202            }
203
204            era::save_stage_checkpoints(
205                &provider,
206                input.checkpoint().block_number,
207                height,
208                height,
209                input.target(),
210            )?;
211
212            height
213        } else {
214            input.target()
215        };
216
217        Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height == input.target() })
218    }
219
220    fn unwind(
221        &mut self,
222        _provider: &Provider,
223        input: UnwindInput,
224    ) -> Result<UnwindOutput, StageError> {
225        Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
226    }
227}
228
229/// Describes where to get the era files from.
230#[derive(Debug, Clone)]
231pub enum EraImportSource {
232    /// Remote HTTP accessible host.
233    Url(Url, Box<Path>),
234    /// Local directory.
235    Path(Box<Path>),
236}
237
238impl EraImportSource {
239    /// Maybe constructs a new `EraImportSource` depending on the arguments.
240    ///
241    /// Only one of `url` or `path` should be provided, but upholding this invariant is delegated
242    /// above so that both parameters can be accepted.
243    ///
244    /// # Arguments
245    /// * The `path` uses a directory as the import source. It and its contents must be readable.
246    /// * The `url` uses an HTTP client to list and download files.
247    /// * The `default` gives the default [`Url`] if none of the previous parameters are provided.
248    /// * For any [`Url`] the `folder` is used as the download directory for storing files
249    ///   temporarily. It and its contents must be readable and writable.
250    pub fn maybe_new(
251        path: Option<Box<Path>>,
252        url: Option<Url>,
253        default: impl FnOnce() -> Option<Url>,
254        folder: impl FnOnce() -> Box<Path>,
255    ) -> Option<Self> {
256        path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::test_utils::{
264        stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
265    };
266    use alloy_primitives::B256;
267    use assert_matches::assert_matches;
268    use reth_db_api::tables;
269    use reth_provider::BlockHashReader;
270    use reth_testing_utils::generators::{self, random_header};
271    use test_runner::EraTestRunner;
272
273    #[tokio::test]
274    async fn test_era_range_ends_below_target() {
275        let era_cap = 2;
276        let target = 20000;
277
278        let mut runner = EraTestRunner::default();
279
280        let input = ExecInput { target: Some(era_cap), checkpoint: None };
281        runner.seed_execution(input).unwrap();
282
283        let input = ExecInput { target: Some(target), checkpoint: None };
284        let output = runner.execute(input).await.unwrap();
285
286        runner.commit();
287
288        assert_matches!(
289            output,
290            Ok(ExecOutput {
291                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
292                done: false
293            }) if block_number == era_cap
294        );
295
296        let output = output.unwrap();
297        let validation_output = runner.validate_execution(input, Some(output.clone()));
298
299        assert_matches!(validation_output, Ok(()));
300
301        runner.take_responses();
302
303        let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
304        let output = runner.execute(input).await.unwrap();
305
306        runner.commit();
307
308        assert_matches!(
309            output,
310            Ok(ExecOutput {
311                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
312                done: true
313            }) if block_number == target
314        );
315
316        let validation_output = runner.validate_execution(input, output.ok());
317
318        assert_matches!(validation_output, Ok(()));
319    }
320
321    mod test_runner {
322        use super::*;
323        use crate::test_utils::{TestRunnerError, TestStageDB};
324        use alloy_consensus::{BlockBody, Header};
325        use futures_util::stream;
326        use reth_db_api::{
327            cursor::DbCursorRO,
328            models::{StoredBlockBodyIndices, StoredBlockOmmers},
329            transaction::DbTx,
330        };
331        use reth_ethereum_primitives::TransactionSigned;
332        use reth_primitives_traits::{SealedBlock, SealedHeader};
333        use reth_provider::{BlockNumReader, TransactionsProvider};
334        use reth_testing_utils::generators::{
335            random_block_range, random_signed_tx, BlockRangeParams,
336        };
337        use tokio::sync::watch;
338
339        pub(crate) struct EraTestRunner {
340            channel: (watch::Sender<B256>, watch::Receiver<B256>),
341            db: TestStageDB,
342            responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
343        }
344
345        impl Default for EraTestRunner {
346            fn default() -> Self {
347                Self {
348                    channel: watch::channel(B256::ZERO),
349                    db: TestStageDB::default(),
350                    responses: Default::default(),
351                }
352            }
353        }
354
355        impl StageTestRunner for EraTestRunner {
356            type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
357
358            fn db(&self) -> &TestStageDB {
359                &self.db
360            }
361
362            fn stage(&self) -> Self::S {
363                EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
364            }
365        }
366
367        impl ExecuteStageTestRunner for EraTestRunner {
368            type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
369
370            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
371                let start = input.checkpoint().block_number;
372                let end = input.target();
373
374                let static_file_provider = self.db.factory.static_file_provider();
375
376                let mut rng = generators::rng();
377
378                // Static files do not support gaps in headers, so we need to generate 0 to end
379                let blocks = random_block_range(
380                    &mut rng,
381                    0..=end,
382                    BlockRangeParams {
383                        parent: Some(B256::ZERO),
384                        tx_count: 0..2,
385                        ..Default::default()
386                    },
387                );
388                self.db.insert_headers_with_td(blocks.iter().map(|block| block.sealed_header()))?;
389                if let Some(progress) = blocks.get(start as usize) {
390                    // Insert last progress data
391                    {
392                        let tx = self.db.factory.provider_rw()?.into_tx();
393                        let mut static_file_producer = static_file_provider
394                            .get_writer(start, StaticFileSegment::Transactions)?;
395
396                        let body = StoredBlockBodyIndices {
397                            first_tx_num: 0,
398                            tx_count: progress.transaction_count() as u64,
399                        };
400
401                        static_file_producer.set_block_range(0..=progress.number);
402
403                        body.tx_num_range().try_for_each(|tx_num| {
404                            let transaction = random_signed_tx(&mut rng);
405                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
406                        })?;
407
408                        if body.tx_count != 0 {
409                            tx.put::<tables::TransactionBlocks>(
410                                body.last_tx_num(),
411                                progress.number,
412                            )?;
413                        }
414
415                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
416
417                        if !progress.ommers_hash_is_empty() {
418                            tx.put::<tables::BlockOmmers>(
419                                progress.number,
420                                StoredBlockOmmers { ommers: progress.body().ommers.clone() },
421                            )?;
422                        }
423
424                        static_file_producer.commit()?;
425                        tx.commit()?;
426                    }
427                }
428                self.responses.replace(
429                    blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
430                );
431                Ok(blocks)
432            }
433
434            /// Validate stored headers and bodies
435            fn validate_execution(
436                &self,
437                input: ExecInput,
438                output: Option<ExecOutput>,
439            ) -> Result<(), TestRunnerError> {
440                let initial_checkpoint = input.checkpoint().block_number;
441                match output {
442                    Some(output) if output.checkpoint.block_number > initial_checkpoint => {
443                        let provider = self.db.factory.provider()?;
444                        let mut td = provider
445                            .header_td_by_number(initial_checkpoint.saturating_sub(1))?
446                            .unwrap_or_default();
447
448                        for block_num in initial_checkpoint..
449                            output
450                                .checkpoint
451                                .block_number
452                                .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
453                                    as BlockNumber)
454                        {
455                            // look up the header hash
456                            let hash = provider.block_hash(block_num)?.expect("no header hash");
457
458                            // validate the header number
459                            assert_eq!(provider.block_number(hash)?, Some(block_num));
460
461                            // validate the header
462                            let header = provider.header_by_number(block_num)?;
463                            assert!(header.is_some());
464                            let header = SealedHeader::seal_slow(header.unwrap());
465                            assert_eq!(header.hash(), hash);
466
467                            // validate the header total difficulty
468                            td += header.difficulty;
469                            assert_eq!(provider.header_td_by_number(block_num)?, Some(td));
470                        }
471
472                        self.validate_db_blocks(
473                            output.checkpoint.block_number,
474                            output.checkpoint.block_number,
475                        )?;
476                    }
477                    _ => self.check_no_header_entry_above(initial_checkpoint)?,
478                };
479                Ok(())
480            }
481
482            async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
483                let tip = if headers.is_empty() {
484                    let tip = random_header(&mut generators::rng(), 0, None);
485                    self.db.insert_headers(iter::once(&tip))?;
486                    tip.hash()
487                } else {
488                    headers.last().unwrap().hash()
489                };
490                self.send_tip(tip);
491                Ok(())
492            }
493        }
494
495        impl UnwindStageTestRunner for EraTestRunner {
496            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
497                Ok(())
498            }
499        }
500
501        impl EraTestRunner {
502            pub(crate) fn check_no_header_entry_above(
503                &self,
504                block: BlockNumber,
505            ) -> Result<(), TestRunnerError> {
506                self.db
507                    .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
508                self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
509                self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
510                self.db.ensure_no_entry_above::<tables::HeaderTerminalDifficulties, _>(
511                    block,
512                    |num| num,
513                )?;
514                Ok(())
515            }
516
517            pub(crate) fn send_tip(&self, tip: B256) {
518                self.channel.0.send(tip).expect("failed to send tip");
519            }
520
521            /// Validate that the inserted block data is valid
522            pub(crate) fn validate_db_blocks(
523                &self,
524                prev_progress: BlockNumber,
525                highest_block: BlockNumber,
526            ) -> Result<(), TestRunnerError> {
527                let static_file_provider = self.db.factory.static_file_provider();
528
529                self.db.query(|tx| {
530                    // Acquire cursors on body related tables
531                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
532                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
533                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
534
535                    let first_body_key = match bodies_cursor.first()? {
536                        Some((key, _)) => key,
537                        None => return Ok(()),
538                    };
539
540                    let mut prev_number: Option<BlockNumber> = None;
541
542
543                    for entry in bodies_cursor.walk(Some(first_body_key))? {
544                        let (number, body) = entry?;
545
546                        // Validate sequentiality only after prev progress,
547                        // since the data before is mocked and can contain gaps
548                        if number > prev_progress
549                            && let Some(prev_key) = prev_number {
550                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
551                            }
552
553                        // Validate that the current entry is below or equals to the highest allowed block
554                        assert!(
555                            number <= highest_block,
556                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
557                        );
558
559                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
560                        // Validate that ommers exist if any
561                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
562                        if header.ommers_hash_is_empty() {
563                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
564                        } else {
565                            assert!(stored_ommers.is_some(), "Missing ommers entry");
566                        }
567
568                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
569                        if body.tx_count == 0 {
570                            assert_ne!(tx_block_id,Some(number));
571                        } else {
572                            assert_eq!(tx_block_id, Some(number));
573                        }
574
575                        for tx_id in body.tx_num_range() {
576                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
577                        }
578
579                        prev_number = Some(number);
580                    }
581                    Ok(())
582                })?;
583                Ok(())
584            }
585
586            pub(crate) fn take_responses(&mut self) {
587                self.responses.take();
588            }
589
590            pub(crate) fn commit(&self) {
591                self.db.factory.static_file_provider().commit().unwrap();
592            }
593        }
594
595        #[derive(Clone)]
596        pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
597
598        impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
599            fn create(
600                self,
601                _input: ExecInput,
602            ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
603            {
604                let stream = stream::iter(vec![self.0]);
605
606                Ok(Box::new(Box::pin(stream.map(|meta| {
607                    Ok(Box::new(meta.into_iter().map(Ok))
608                        as Item<Header, BlockBody<TransactionSigned>>)
609                }))))
610            }
611        }
612    }
613
614    stage_test_suite!(EraTestRunner, era);
615}