1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{
8 common::file_ops::{EraFileType, StreamReader},
9 era1::file::Era1Reader,
10 ere::file::EreReader,
11};
12use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
13use reth_era_utils as era;
14use reth_etl::Collector;
15use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
16use reth_provider::{
17 BlockReader, BlockWriter, DBProvider, StageCheckpointWriter, StaticFileProviderFactory,
18 StaticFileWriter,
19};
20use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
21use reth_static_file_types::StaticFileSegment;
22use std::{
23 fmt::{Debug, Formatter},
24 iter,
25 path::Path,
26 task::{ready, Context, Poll},
27};
28
29type Item<Header, Body> =
30 Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
31type ThreadSafeEraStream<Header, Body> =
32 Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
33
34pub struct EraStage<Header, Body, StreamFactory> {
46 source: Option<StreamFactory>,
48 hash_collector: Collector<BlockHash, BlockNumber>,
51 item: Option<Item<Header, Body>>,
53 stream: Option<ThreadSafeEraStream<Header, Body>>,
55}
56
57trait EraStreamFactory<Header, Body> {
58 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
59}
60
61impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
62where
63 Header: FullBlockHeader + Value,
64 Body: FullBlockBody<OmmerHeader = Header>,
65{
66 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
67 match self {
68 Self::Path(path) => Self::convert(
69 read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
70 ),
71 Self::Url(url, folder) => {
72 let _ = reth_fs_util::create_dir_all(&folder);
73 let client = EraClient::new(Client::new(), url, folder);
74
75 Self::convert(EraStream::new(
76 client,
77 EraStreamConfig::default().start_from(input.next_block()),
78 ))
79 }
80 }
81 }
82}
83
84impl EraImportSource {
85 fn convert<Header, Body>(
86 stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
87 + Send
88 + Sync
89 + 'static
90 + Unpin,
91 ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
92 where
93 Header: FullBlockHeader + Value,
94 Body: FullBlockBody<OmmerHeader = Header>,
95 {
96 Ok(Box::new(Box::pin(stream.map(|meta| {
97 meta.and_then(|meta| {
98 let file = reth_fs_util::open(meta.path())?;
101 let iter = match meta
102 .path()
103 .file_name()
104 .and_then(|name| name.to_str())
105 .and_then(EraFileType::from_filename)
106 {
107 Some(EraFileType::Ere) => {
108 Box::new(EreReader::new(file).iter().map(era::Ere::decode))
109 as Item<Header, Body>
110 }
111 _ => Box::new(Era1Reader::new(file).iter().map(era::decode))
112 as Item<Header, Body>,
113 };
114
115 let iter = iter.chain(
116 iter::once_with(move || match meta.mark_as_processed() {
117 Ok(..) => None,
118 Err(e) => Some(Err(e)),
119 })
120 .flatten(),
121 );
122
123 Ok(Box::new(iter) as Item<Header, Body>)
124 })
125 }))))
126 }
127}
128
129impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
130 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("EraStage")
132 .field("source", &self.source)
133 .field("hash_collector", &self.hash_collector)
134 .field("item", &self.item.is_some())
135 .field("stream", &"dyn Stream")
136 .finish()
137 }
138}
139
140impl<Header, Body, F> EraStage<Header, Body, F> {
141 pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
143 Self {
144 source,
145 item: None,
146 stream: None,
147 hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
148 }
149 }
150}
151
152impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
153where
154 Provider: DBProvider<Tx: DbTxMut>
155 + StaticFileProviderFactory<Primitives = N>
156 + BlockWriter<Block = N::Block>
157 + BlockReader<Block = N::Block>
158 + StageCheckpointWriter,
159 F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
160 N: NodePrimitives<BlockHeader: Value>,
161{
162 fn id(&self) -> StageId {
163 StageId::Era
164 }
165
166 fn poll_execute_ready(
167 &mut self,
168 cx: &mut Context<'_>,
169 input: ExecInput,
170 ) -> Poll<Result<(), StageError>> {
171 if input.target_reached() || self.item.is_some() {
172 return Poll::Ready(Ok(()));
173 }
174
175 if self.stream.is_none() &&
176 let Some(source) = self.source.clone()
177 {
178 self.stream.replace(source.create(input)?);
179 }
180 if let Some(stream) = &mut self.stream &&
181 let Some(next) = ready!(stream.poll_next_unpin(cx))
182 .transpose()
183 .map_err(|e| StageError::Fatal(e.into()))?
184 {
185 self.item.replace(next);
186 }
187
188 Poll::Ready(Ok(()))
189 }
190
191 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
192 let height = if let Some(era) = self.item.take() {
193 let static_file_provider = provider.static_file_provider();
194
195 let last_header_number = static_file_provider
198 .get_highest_static_file_block(StaticFileSegment::Headers)
199 .unwrap_or_default();
200
201 let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
204
205 let height = era::process_iter(
206 era,
207 &mut writer,
208 provider,
209 &mut self.hash_collector,
210 last_header_number..=input.target(),
211 )
212 .map_err(|e| StageError::Fatal(e.into()))?;
213
214 if !self.hash_collector.is_empty() {
215 era::build_index(provider, &mut self.hash_collector)
216 .map_err(|e| StageError::Recoverable(e.into()))?;
217 self.hash_collector.clear();
218 }
219
220 era::save_stage_checkpoints(
221 provider,
222 input.checkpoint().block_number,
223 height,
224 height,
225 input.target(),
226 )?;
227
228 height
229 } else {
230 let highest_header = provider
240 .static_file_provider()
241 .get_highest_static_file_block(StaticFileSegment::Headers)
242 .unwrap_or_default();
243
244 let checkpoint = input.checkpoint().block_number;
245 let from_target = input.target.unwrap_or(checkpoint);
246
247 checkpoint.max(highest_header).max(from_target)
248 };
249
250 Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height >= input.target() })
251 }
252
253 fn unwind(
254 &mut self,
255 _provider: &Provider,
256 input: UnwindInput,
257 ) -> Result<UnwindOutput, StageError> {
258 Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
259 }
260}
261
262#[derive(Debug, Clone)]
264pub enum EraImportSource {
265 Url(Url, Box<Path>),
267 Path(Box<Path>),
269}
270
271impl EraImportSource {
272 pub fn maybe_new(
284 path: Option<Box<Path>>,
285 url: Option<Url>,
286 default: impl FnOnce() -> Option<Url>,
287 folder: impl FnOnce() -> Box<Path>,
288 ) -> Option<Self> {
289 path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use crate::test_utils::{
297 stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
298 };
299 use alloy_consensus::{BlockBody, Header};
300 use alloy_primitives::B256;
301 use assert_matches::assert_matches;
302 use futures_util::stream;
303 use reth_db_api::tables;
304 use reth_era::{
305 common::file_ops::{EraFileFormat, StreamWriter},
306 ere::{
307 file::{EreFile, EreWriter},
308 types::{
309 execution::{BlockTuple, CompressedBody, CompressedHeader},
310 group::{DynamicBlockIndex, EreGroup, EreId},
311 },
312 },
313 };
314 use reth_ethereum_primitives::{Block, TransactionSigned};
315 use reth_primitives_traits::SealedBlock;
316 use reth_provider::BlockHashReader;
317 use reth_testing_utils::generators::{
318 self, random_block_range, random_header, BlockRangeParams,
319 };
320 use std::{fs::File, path::PathBuf};
321 use test_runner::EraTestRunner;
322
323 #[tokio::test]
324 async fn test_era_range_ends_below_target() {
325 let era_cap = 2;
326 let target = 20000;
327
328 let mut runner = EraTestRunner::default();
329
330 let input = ExecInput { target: Some(era_cap), checkpoint: None };
331 runner.seed_execution(input).unwrap();
332
333 let input = ExecInput { target: Some(target), checkpoint: None };
334 let output = runner.execute(input).await.unwrap();
335
336 runner.commit();
337
338 assert_matches!(
339 output,
340 Ok(ExecOutput {
341 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
342 done: false
343 }) if block_number == era_cap
344 );
345
346 let output = output.unwrap();
347 let validation_output = runner.validate_execution(input, Some(output.clone()));
348
349 assert_matches!(validation_output, Ok(()));
350
351 runner.take_responses();
352
353 let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
354 let output = runner.execute(input).await.unwrap();
355
356 runner.commit();
357
358 assert_matches!(
359 output,
360 Ok(ExecOutput {
361 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
362 done: true
363 }) if block_number == target
364 );
365
366 let validation_output = runner.validate_execution(input, output.ok());
367
368 assert_matches!(validation_output, Ok(()));
369 }
370
371 fn write_ere_file(path: &Path, blocks: &[SealedBlock<Block>]) {
373 let tuples = blocks
374 .iter()
375 .map(|block| {
376 BlockTuple::new(
377 CompressedHeader::from_header(block.header()).unwrap(),
378 CompressedBody::from_body(block.body()).unwrap(),
379 )
380 })
381 .collect::<Vec<_>>();
382
383 let start = blocks[0].number;
384 let component_count = 2;
387 let offsets = vec![0; blocks.len() * component_count as usize];
388 let index = DynamicBlockIndex::new(start, component_count, offsets);
389 let group = EreGroup::new(tuples, None, index);
390 let file = EreFile::new(group, EreId::new("mainnet", start, blocks.len() as u32));
391
392 EreWriter::new(File::create(path).unwrap()).write_file(&file).unwrap();
393 }
394
395 #[derive(Debug)]
396 struct EreTestMeta {
397 path: PathBuf,
398 }
399
400 impl EraMeta for EreTestMeta {
401 fn mark_as_processed(&self) -> eyre::Result<()> {
402 Ok(())
403 }
404
405 fn path(&self) -> &Path {
406 &self.path
407 }
408 }
409
410 #[tokio::test]
411 async fn convert_decodes_ere_files() {
412 let mut rng = generators::rng();
413 let blocks = random_block_range(
414 &mut rng,
415 1..=3,
416 BlockRangeParams { tx_count: 1..3, ..Default::default() },
417 );
418
419 let dir = tempfile::tempdir().unwrap();
420
421 for ext in ["ere", "erae"] {
424 let path = dir.path().join(format!("mainnet-00000-abcd1234.{ext}"));
425 write_ere_file(&path, &blocks);
426
427 let stream =
428 stream::iter(vec![Ok::<_, eyre::Error>(EreTestMeta { path: path.clone() })]);
429 let mut stream =
430 EraImportSource::convert::<Header, BlockBody<TransactionSigned>>(stream).unwrap();
431
432 let item = stream.next().await.expect("a file to decode").expect("decoding to succeed");
433 let decoded = item.collect::<eyre::Result<Vec<_>>>().unwrap();
434
435 assert_eq!(
436 decoded.len(),
437 blocks.len(),
438 "ERE file with `.{ext}` extension should decode every block"
439 );
440 for ((header, body), block) in decoded.iter().zip(&blocks) {
441 assert_eq!(header, block.header());
442 assert_eq!(body, block.body());
443 }
444 }
445 }
446
447 mod test_runner {
448 use super::*;
449 use crate::test_utils::{TestRunnerError, TestStageDB};
450 use alloy_consensus::{BlockBody, Header};
451 use futures_util::stream;
452 use reth_db_api::{
453 cursor::DbCursorRO,
454 models::{StoredBlockBodyIndices, StoredBlockOmmers},
455 transaction::DbTx,
456 };
457 use reth_ethereum_primitives::TransactionSigned;
458 use reth_primitives_traits::{SealedBlock, SealedHeader};
459 use reth_provider::{BlockNumReader, HeaderProvider, TransactionsProvider};
460 use reth_testing_utils::generators::{
461 random_block_range, random_signed_tx, BlockRangeParams,
462 };
463 use tokio::sync::watch;
464
465 pub(crate) struct EraTestRunner {
466 channel: (watch::Sender<B256>, watch::Receiver<B256>),
467 db: TestStageDB,
468 responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
469 }
470
471 impl Default for EraTestRunner {
472 fn default() -> Self {
473 Self {
474 channel: watch::channel(B256::ZERO),
475 db: TestStageDB::default(),
476 responses: Default::default(),
477 }
478 }
479 }
480
481 impl StageTestRunner for EraTestRunner {
482 type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
483
484 fn db(&self) -> &TestStageDB {
485 &self.db
486 }
487
488 fn stage(&self) -> Self::S {
489 EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
490 }
491 }
492
493 impl ExecuteStageTestRunner for EraTestRunner {
494 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
495
496 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
497 let start = input.checkpoint().block_number;
498 let end = input.target();
499
500 let static_file_provider = self.db.factory.static_file_provider();
501
502 let mut rng = generators::rng();
503
504 let blocks = random_block_range(
506 &mut rng,
507 0..=end,
508 BlockRangeParams {
509 parent: Some(B256::ZERO),
510 tx_count: 0..2,
511 ..Default::default()
512 },
513 );
514 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
515 if let Some(progress) = blocks.get(start as usize) {
516 {
518 let tx = self.db.factory.provider_rw()?.into_tx();
519 let mut static_file_producer = static_file_provider
520 .get_writer(start, StaticFileSegment::Transactions)?;
521
522 let body = StoredBlockBodyIndices {
523 first_tx_num: 0,
524 tx_count: progress.transaction_count() as u64,
525 };
526
527 static_file_producer.set_block_range(0..=progress.number);
528
529 body.tx_num_range().try_for_each(|tx_num| {
530 let transaction = random_signed_tx(&mut rng);
531 static_file_producer.append_transaction(tx_num, &transaction).map(drop)
532 })?;
533
534 if body.tx_count != 0 {
535 tx.put::<tables::TransactionBlocks>(
536 body.last_tx_num(),
537 progress.number,
538 )?;
539 }
540
541 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
542
543 if !progress.ommers_hash_is_empty() {
544 tx.put::<tables::BlockOmmers>(
545 progress.number,
546 StoredBlockOmmers { ommers: progress.body().ommers.clone() },
547 )?;
548 }
549
550 static_file_producer.commit()?;
551 tx.commit()?;
552 }
553 }
554 self.responses.replace(
555 blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
556 );
557 Ok(blocks)
558 }
559
560 fn validate_execution(
562 &self,
563 input: ExecInput,
564 output: Option<ExecOutput>,
565 ) -> Result<(), TestRunnerError> {
566 let initial_checkpoint = input.checkpoint().block_number;
567 match output {
568 Some(output) if output.checkpoint.block_number > initial_checkpoint => {
569 let provider = self.db.factory.provider()?;
570
571 for block_num in initial_checkpoint..
572 output
573 .checkpoint
574 .block_number
575 .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
576 as BlockNumber)
577 {
578 let hash = provider.block_hash(block_num)?.expect("no header hash");
580
581 assert_eq!(provider.block_number(hash)?, Some(block_num));
583
584 let header = provider.header_by_number(block_num)?;
586 assert!(header.is_some());
587 let header = SealedHeader::seal_slow(header.unwrap());
588 assert_eq!(header.hash(), hash);
589 }
590
591 self.validate_db_blocks(
592 output.checkpoint.block_number,
593 output.checkpoint.block_number,
594 )?;
595 }
596 _ => self.check_no_header_entry_above(initial_checkpoint)?,
597 };
598 Ok(())
599 }
600
601 async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
602 let tip = if headers.is_empty() {
603 let tip = random_header(&mut generators::rng(), 0, None);
604 self.db.insert_headers(iter::once(&tip))?;
605 tip.hash()
606 } else {
607 headers.last().unwrap().hash()
608 };
609 self.send_tip(tip);
610 Ok(())
611 }
612 }
613
614 impl UnwindStageTestRunner for EraTestRunner {
615 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
616 Ok(())
617 }
618 }
619
620 impl EraTestRunner {
621 pub(crate) fn check_no_header_entry_above(
622 &self,
623 block: BlockNumber,
624 ) -> Result<(), TestRunnerError> {
625 self.db
626 .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
627 self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
628 self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
629 Ok(())
630 }
631
632 pub(crate) fn send_tip(&self, tip: B256) {
633 self.channel.0.send(tip).expect("failed to send tip");
634 }
635
636 pub(crate) fn validate_db_blocks(
638 &self,
639 prev_progress: BlockNumber,
640 highest_block: BlockNumber,
641 ) -> Result<(), TestRunnerError> {
642 let static_file_provider = self.db.factory.static_file_provider();
643
644 self.db.query(|tx| {
645 let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
647 let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
648 let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
649
650 let first_body_key = match bodies_cursor.first()? {
651 Some((key, _)) => key,
652 None => return Ok(()),
653 };
654
655 let mut prev_number: Option<BlockNumber> = None;
656
657
658 for entry in bodies_cursor.walk(Some(first_body_key))? {
659 let (number, body) = entry?;
660
661 if number > prev_progress
664 && let Some(prev_key) = prev_number {
665 assert_eq!(prev_key + 1, number, "Body entries must be sequential");
666 }
667
668 assert!(
670 number <= highest_block,
671 "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
672 );
673
674 let header = static_file_provider.header_by_number(number)?.expect("to be present");
675 let stored_ommers = ommers_cursor.seek_exact(number)?;
677 if header.ommers_hash_is_empty() {
678 assert!(stored_ommers.is_none(), "Unexpected ommers entry");
679 } else {
680 assert!(stored_ommers.is_some(), "Missing ommers entry");
681 }
682
683 let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
684 if body.tx_count == 0 {
685 assert_ne!(tx_block_id,Some(number));
686 } else {
687 assert_eq!(tx_block_id, Some(number));
688 }
689
690 for tx_id in body.tx_num_range() {
691 assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
692 }
693
694 prev_number = Some(number);
695 }
696 Ok(())
697 })?;
698 Ok(())
699 }
700
701 pub(crate) fn take_responses(&mut self) {
702 self.responses.take();
703 }
704
705 pub(crate) fn commit(&self) {
706 self.db.factory.static_file_provider().commit().unwrap();
707 }
708 }
709
710 #[derive(Clone)]
711 pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
712
713 impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
714 fn create(
715 self,
716 _input: ExecInput,
717 ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
718 {
719 let stream = stream::iter(vec![self.0]);
720
721 Ok(Box::new(Box::pin(stream.map(|meta| {
722 Ok(Box::new(meta.into_iter().map(Ok))
723 as Item<Header, BlockBody<TransactionSigned>>)
724 }))))
725 }
726 }
727 }
728
729 stage_test_suite!(EraTestRunner, era);
730}