reth_stages/stages/
era.rs

1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{era1_file::Era1Reader, era_file_ops::StreamReader};
8use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
9use reth_era_utils as era;
10use reth_etl::Collector;
11use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
12use reth_provider::{
13    BlockReader, BlockWriter, DBProvider, HeaderProvider, StageCheckpointWriter,
14    StaticFileProviderFactory, StaticFileWriter,
15};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_errors::ProviderError;
19use std::{
20    fmt::{Debug, Formatter},
21    iter,
22    path::Path,
23    task::{ready, Context, Poll},
24};
25
26type Item<Header, Body> =
27    Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
28type ThreadSafeEraStream<Header, Body> =
29    Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
30
31/// The [ERA1](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md)
32/// pre-merge history stage.
33///
34/// Imports block headers and bodies from genesis up to the last pre-merge block. Receipts are
35/// generated by execution. Execution is not done in this stage.
36pub struct EraStage<Header, Body, StreamFactory> {
37    /// The `source` creates `stream`.
38    source: Option<StreamFactory>,
39    /// A map of block hash to block height collected when processing headers and inserted into
40    /// database afterward.
41    hash_collector: Collector<BlockHash, BlockNumber>,
42    /// Last extracted iterator of block `Header` and `Body` pairs.
43    item: Option<Item<Header, Body>>,
44    /// A stream of [`Item`]s, i.e. iterators over block `Header` and `Body` pairs.
45    stream: Option<ThreadSafeEraStream<Header, Body>>,
46}
47
48trait EraStreamFactory<Header, Body> {
49    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
50}
51
52impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
53where
54    Header: FullBlockHeader + Value,
55    Body: FullBlockBody<OmmerHeader = Header>,
56{
57    fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
58        match self {
59            Self::Path(path) => Self::convert(
60                read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
61            ),
62            Self::Url(url, folder) => {
63                let _ = reth_fs_util::create_dir_all(&folder);
64                let client = EraClient::new(Client::new(), url, folder);
65
66                Self::convert(EraStream::new(
67                    client,
68                    EraStreamConfig::default().start_from(input.next_block()),
69                ))
70            }
71        }
72    }
73}
74
75impl EraImportSource {
76    fn convert<Header, Body>(
77        stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
78            + Send
79            + Sync
80            + 'static
81            + Unpin,
82    ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
83    where
84        Header: FullBlockHeader + Value,
85        Body: FullBlockBody<OmmerHeader = Header>,
86    {
87        Ok(Box::new(Box::pin(stream.map(|meta| {
88            meta.and_then(|meta| {
89                let file = reth_fs_util::open(meta.path())?;
90                let reader = Era1Reader::new(file);
91                let iter = reader.iter();
92                let iter = iter.map(era::decode);
93                let iter = iter.chain(
94                    iter::once_with(move || match meta.mark_as_processed() {
95                        Ok(..) => None,
96                        Err(e) => Some(Err(e)),
97                    })
98                    .flatten(),
99                );
100
101                Ok(Box::new(iter) as Item<Header, Body>)
102            })
103        }))))
104    }
105}
106
107impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("EraStage")
110            .field("source", &self.source)
111            .field("hash_collector", &self.hash_collector)
112            .field("item", &self.item.is_some())
113            .field("stream", &"dyn Stream")
114            .finish()
115    }
116}
117
118impl<Header, Body, F> EraStage<Header, Body, F> {
119    /// Creates a new [`EraStage`].
120    pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
121        Self {
122            source,
123            item: None,
124            stream: None,
125            hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
126        }
127    }
128}
129
130impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
131where
132    Provider: DBProvider<Tx: DbTxMut>
133        + StaticFileProviderFactory<Primitives = N>
134        + BlockWriter<Block = N::Block>
135        + BlockReader<Block = N::Block>
136        + StageCheckpointWriter,
137    F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
138    N: NodePrimitives<BlockHeader: Value>,
139{
140    fn id(&self) -> StageId {
141        StageId::Era
142    }
143
144    fn poll_execute_ready(
145        &mut self,
146        cx: &mut Context<'_>,
147        input: ExecInput,
148    ) -> Poll<Result<(), StageError>> {
149        if input.target_reached() || self.item.is_some() {
150            return Poll::Ready(Ok(()));
151        }
152
153        if self.stream.is_none() {
154            if let Some(source) = self.source.clone() {
155                self.stream.replace(source.create(input)?);
156            }
157        }
158        if let Some(stream) = &mut self.stream {
159            if let Some(next) = ready!(stream.poll_next_unpin(cx))
160                .transpose()
161                .map_err(|e| StageError::Fatal(e.into()))?
162            {
163                self.item.replace(next);
164            }
165        }
166
167        Poll::Ready(Ok(()))
168    }
169
170    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
171        let height = if let Some(era) = self.item.take() {
172            let static_file_provider = provider.static_file_provider();
173
174            // Consistency check of expected headers in static files vs DB is done on
175            // provider::sync_gap when poll_execute_ready is polled.
176            let last_header_number = static_file_provider
177                .get_highest_static_file_block(StaticFileSegment::Headers)
178                .unwrap_or_default();
179
180            // Find the latest total difficulty
181            let mut td = static_file_provider
182                .header_td_by_number(last_header_number)?
183                .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
184
185            // Although headers were downloaded in reverse order, the collector iterates it in
186            // ascending order
187            let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
188
189            let height = era::process_iter(
190                era,
191                &mut writer,
192                provider,
193                &mut self.hash_collector,
194                &mut td,
195                last_header_number..=input.target(),
196            )
197            .map_err(|e| StageError::Fatal(e.into()))?;
198
199            if !self.hash_collector.is_empty() {
200                era::build_index(provider, &mut self.hash_collector)
201                    .map_err(|e| StageError::Recoverable(e.into()))?;
202                self.hash_collector.clear();
203            }
204
205            era::save_stage_checkpoints(
206                &provider,
207                input.checkpoint().block_number,
208                height,
209                height,
210                input.target(),
211            )?;
212
213            height
214        } else {
215            input.target()
216        };
217
218        Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height == input.target() })
219    }
220
221    fn unwind(
222        &mut self,
223        _provider: &Provider,
224        input: UnwindInput,
225    ) -> Result<UnwindOutput, StageError> {
226        Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
227    }
228}
229
230/// Describes where to get the era files from.
231#[derive(Debug, Clone)]
232pub enum EraImportSource {
233    /// Remote HTTP accessible host.
234    Url(Url, Box<Path>),
235    /// Local directory.
236    Path(Box<Path>),
237}
238
239impl EraImportSource {
240    /// Maybe constructs a new `EraImportSource` depending on the arguments.
241    ///
242    /// Only one of `url` or `path` should be provided, but upholding this invariant is delegated
243    /// above so that both parameters can be accepted.
244    ///
245    /// # Arguments
246    /// * The `path` uses a directory as the import source. It and its contents must be readable.
247    /// * The `url` uses an HTTP client to list and download files.
248    /// * The `default` gives the default [`Url`] if none of the previous parameters are provided.
249    /// * For any [`Url`] the `folder` is used as the download directory for storing files
250    ///   temporarily. It and its contents must be readable and writable.
251    pub fn maybe_new(
252        path: Option<Box<Path>>,
253        url: Option<Url>,
254        default: impl FnOnce() -> Option<Url>,
255        folder: impl FnOnce() -> Box<Path>,
256    ) -> Option<Self> {
257        path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::test_utils::{
265        stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
266    };
267    use alloy_primitives::B256;
268    use assert_matches::assert_matches;
269    use reth_db_api::tables;
270    use reth_provider::BlockHashReader;
271    use reth_testing_utils::generators::{self, random_header};
272    use test_runner::EraTestRunner;
273
274    #[tokio::test]
275    async fn test_era_range_ends_below_target() {
276        let era_cap = 2;
277        let target = 20000;
278
279        let mut runner = EraTestRunner::default();
280
281        let input = ExecInput { target: Some(era_cap), checkpoint: None };
282        runner.seed_execution(input).unwrap();
283
284        let input = ExecInput { target: Some(target), checkpoint: None };
285        let output = runner.execute(input).await.unwrap();
286
287        runner.commit();
288
289        assert_matches!(
290            output,
291            Ok(ExecOutput {
292                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
293                done: false
294            }) if block_number == era_cap
295        );
296
297        let output = output.unwrap();
298        let validation_output = runner.validate_execution(input, Some(output.clone()));
299
300        assert_matches!(validation_output, Ok(()));
301
302        runner.take_responses();
303
304        let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
305        let output = runner.execute(input).await.unwrap();
306
307        runner.commit();
308
309        assert_matches!(
310            output,
311            Ok(ExecOutput {
312                checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
313                done: true
314            }) if block_number == target
315        );
316
317        let validation_output = runner.validate_execution(input, output.ok());
318
319        assert_matches!(validation_output, Ok(()));
320    }
321
322    mod test_runner {
323        use super::*;
324        use crate::test_utils::{TestRunnerError, TestStageDB};
325        use alloy_consensus::{BlockBody, Header};
326        use futures_util::stream;
327        use reth_db_api::{
328            cursor::DbCursorRO,
329            models::{StoredBlockBodyIndices, StoredBlockOmmers},
330            transaction::DbTx,
331        };
332        use reth_ethereum_primitives::TransactionSigned;
333        use reth_primitives_traits::{SealedBlock, SealedHeader};
334        use reth_provider::{BlockNumReader, TransactionsProvider};
335        use reth_testing_utils::generators::{
336            random_block_range, random_signed_tx, BlockRangeParams,
337        };
338        use tokio::sync::watch;
339
340        pub(crate) struct EraTestRunner {
341            channel: (watch::Sender<B256>, watch::Receiver<B256>),
342            db: TestStageDB,
343            responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
344        }
345
346        impl Default for EraTestRunner {
347            fn default() -> Self {
348                Self {
349                    channel: watch::channel(B256::ZERO),
350                    db: TestStageDB::default(),
351                    responses: Default::default(),
352                }
353            }
354        }
355
356        impl StageTestRunner for EraTestRunner {
357            type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
358
359            fn db(&self) -> &TestStageDB {
360                &self.db
361            }
362
363            fn stage(&self) -> Self::S {
364                EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
365            }
366        }
367
368        impl ExecuteStageTestRunner for EraTestRunner {
369            type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
370
371            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
372                let start = input.checkpoint().block_number;
373                let end = input.target();
374
375                let static_file_provider = self.db.factory.static_file_provider();
376
377                let mut rng = generators::rng();
378
379                // Static files do not support gaps in headers, so we need to generate 0 to end
380                let blocks = random_block_range(
381                    &mut rng,
382                    0..=end,
383                    BlockRangeParams {
384                        parent: Some(B256::ZERO),
385                        tx_count: 0..2,
386                        ..Default::default()
387                    },
388                );
389                self.db.insert_headers_with_td(blocks.iter().map(|block| block.sealed_header()))?;
390                if let Some(progress) = blocks.get(start as usize) {
391                    // Insert last progress data
392                    {
393                        let tx = self.db.factory.provider_rw()?.into_tx();
394                        let mut static_file_producer = static_file_provider
395                            .get_writer(start, StaticFileSegment::Transactions)?;
396
397                        let body = StoredBlockBodyIndices {
398                            first_tx_num: 0,
399                            tx_count: progress.transaction_count() as u64,
400                        };
401
402                        static_file_producer.set_block_range(0..=progress.number);
403
404                        body.tx_num_range().try_for_each(|tx_num| {
405                            let transaction = random_signed_tx(&mut rng);
406                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
407                        })?;
408
409                        if body.tx_count != 0 {
410                            tx.put::<tables::TransactionBlocks>(
411                                body.last_tx_num(),
412                                progress.number,
413                            )?;
414                        }
415
416                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
417
418                        if !progress.ommers_hash_is_empty() {
419                            tx.put::<tables::BlockOmmers>(
420                                progress.number,
421                                StoredBlockOmmers { ommers: progress.body().ommers.clone() },
422                            )?;
423                        }
424
425                        static_file_producer.commit()?;
426                        tx.commit()?;
427                    }
428                }
429                self.responses.replace(
430                    blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
431                );
432                Ok(blocks)
433            }
434
435            /// Validate stored headers and bodies
436            fn validate_execution(
437                &self,
438                input: ExecInput,
439                output: Option<ExecOutput>,
440            ) -> Result<(), TestRunnerError> {
441                let initial_checkpoint = input.checkpoint().block_number;
442                match output {
443                    Some(output) if output.checkpoint.block_number > initial_checkpoint => {
444                        let provider = self.db.factory.provider()?;
445                        let mut td = provider
446                            .header_td_by_number(initial_checkpoint.saturating_sub(1))?
447                            .unwrap_or_default();
448
449                        for block_num in initial_checkpoint..
450                            output
451                                .checkpoint
452                                .block_number
453                                .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
454                                    as BlockNumber)
455                        {
456                            // look up the header hash
457                            let hash = provider.block_hash(block_num)?.expect("no header hash");
458
459                            // validate the header number
460                            assert_eq!(provider.block_number(hash)?, Some(block_num));
461
462                            // validate the header
463                            let header = provider.header_by_number(block_num)?;
464                            assert!(header.is_some());
465                            let header = SealedHeader::seal_slow(header.unwrap());
466                            assert_eq!(header.hash(), hash);
467
468                            // validate the header total difficulty
469                            td += header.difficulty;
470                            assert_eq!(provider.header_td_by_number(block_num)?, Some(td));
471                        }
472
473                        self.validate_db_blocks(
474                            output.checkpoint.block_number,
475                            output.checkpoint.block_number,
476                        )?;
477                    }
478                    _ => self.check_no_header_entry_above(initial_checkpoint)?,
479                };
480                Ok(())
481            }
482
483            async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
484                let tip = if headers.is_empty() {
485                    let tip = random_header(&mut generators::rng(), 0, None);
486                    self.db.insert_headers(iter::once(&tip))?;
487                    tip.hash()
488                } else {
489                    headers.last().unwrap().hash()
490                };
491                self.send_tip(tip);
492                Ok(())
493            }
494        }
495
496        impl UnwindStageTestRunner for EraTestRunner {
497            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
498                Ok(())
499            }
500        }
501
502        impl EraTestRunner {
503            pub(crate) fn check_no_header_entry_above(
504                &self,
505                block: BlockNumber,
506            ) -> Result<(), TestRunnerError> {
507                self.db
508                    .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
509                self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
510                self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
511                self.db.ensure_no_entry_above::<tables::HeaderTerminalDifficulties, _>(
512                    block,
513                    |num| num,
514                )?;
515                Ok(())
516            }
517
518            pub(crate) fn send_tip(&self, tip: B256) {
519                self.channel.0.send(tip).expect("failed to send tip");
520            }
521
522            /// Validate that the inserted block data is valid
523            pub(crate) fn validate_db_blocks(
524                &self,
525                prev_progress: BlockNumber,
526                highest_block: BlockNumber,
527            ) -> Result<(), TestRunnerError> {
528                let static_file_provider = self.db.factory.static_file_provider();
529
530                self.db.query(|tx| {
531                    // Acquire cursors on body related tables
532                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
533                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
534                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
535
536                    let first_body_key = match bodies_cursor.first()? {
537                        Some((key, _)) => key,
538                        None => return Ok(()),
539                    };
540
541                    let mut prev_number: Option<BlockNumber> = None;
542
543
544                    for entry in bodies_cursor.walk(Some(first_body_key))? {
545                        let (number, body) = entry?;
546
547                        // Validate sequentiality only after prev progress,
548                        // since the data before is mocked and can contain gaps
549                        if number > prev_progress {
550                            if let Some(prev_key) = prev_number {
551                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
552                            }
553                        }
554
555                        // Validate that the current entry is below or equals to the highest allowed block
556                        assert!(
557                            number <= highest_block,
558                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
559                        );
560
561                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
562                        // Validate that ommers exist if any
563                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
564                        if header.ommers_hash_is_empty() {
565                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
566                        } else {
567                            assert!(stored_ommers.is_some(), "Missing ommers entry");
568                        }
569
570                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
571                        if body.tx_count == 0 {
572                            assert_ne!(tx_block_id,Some(number));
573                        } else {
574                            assert_eq!(tx_block_id, Some(number));
575                        }
576
577                        for tx_id in body.tx_num_range() {
578                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
579                        }
580
581                        prev_number = Some(number);
582                    }
583                    Ok(())
584                })?;
585                Ok(())
586            }
587
588            pub(crate) fn take_responses(&mut self) {
589                self.responses.take();
590            }
591
592            pub(crate) fn commit(&self) {
593                self.db.factory.static_file_provider().commit().unwrap();
594            }
595        }
596
597        #[derive(Clone)]
598        pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
599
600        impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
601            fn create(
602                self,
603                _input: ExecInput,
604            ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
605            {
606                let stream = stream::iter(vec![self.0]);
607
608                Ok(Box::new(Box::pin(stream.map(|meta| {
609                    Ok(Box::new(meta.into_iter().map(Ok))
610                        as Item<Header, BlockBody<TransactionSigned>>)
611                }))))
612            }
613        }
614    }
615
616    stage_test_suite!(EraTestRunner, era);
617}