1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{era1_file::Era1Reader, era_file_ops::StreamReader};
8use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
9use reth_era_utils as era;
10use reth_etl::Collector;
11use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
12use reth_provider::{
13 BlockReader, BlockWriter, DBProvider, HeaderProvider, StageCheckpointWriter,
14 StaticFileProviderFactory, StaticFileWriter,
15};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_errors::ProviderError;
19use std::{
20 fmt::{Debug, Formatter},
21 iter,
22 path::Path,
23 task::{ready, Context, Poll},
24};
25
26type Item<Header, Body> =
27 Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
28type ThreadSafeEraStream<Header, Body> =
29 Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
30
31pub struct EraStage<Header, Body, StreamFactory> {
37 source: Option<StreamFactory>,
39 hash_collector: Collector<BlockHash, BlockNumber>,
42 item: Option<Item<Header, Body>>,
44 stream: Option<ThreadSafeEraStream<Header, Body>>,
46}
47
48trait EraStreamFactory<Header, Body> {
49 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
50}
51
52impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
53where
54 Header: FullBlockHeader + Value,
55 Body: FullBlockBody<OmmerHeader = Header>,
56{
57 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
58 match self {
59 Self::Path(path) => Self::convert(
60 read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
61 ),
62 Self::Url(url, folder) => {
63 let _ = reth_fs_util::create_dir_all(&folder);
64 let client = EraClient::new(Client::new(), url, folder);
65
66 Self::convert(EraStream::new(
67 client,
68 EraStreamConfig::default().start_from(input.next_block()),
69 ))
70 }
71 }
72 }
73}
74
75impl EraImportSource {
76 fn convert<Header, Body>(
77 stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
78 + Send
79 + Sync
80 + 'static
81 + Unpin,
82 ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
83 where
84 Header: FullBlockHeader + Value,
85 Body: FullBlockBody<OmmerHeader = Header>,
86 {
87 Ok(Box::new(Box::pin(stream.map(|meta| {
88 meta.and_then(|meta| {
89 let file = reth_fs_util::open(meta.path())?;
90 let reader = Era1Reader::new(file);
91 let iter = reader.iter();
92 let iter = iter.map(era::decode);
93 let iter = iter.chain(
94 iter::once_with(move || match meta.mark_as_processed() {
95 Ok(..) => None,
96 Err(e) => Some(Err(e)),
97 })
98 .flatten(),
99 );
100
101 Ok(Box::new(iter) as Item<Header, Body>)
102 })
103 }))))
104 }
105}
106
107impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("EraStage")
110 .field("source", &self.source)
111 .field("hash_collector", &self.hash_collector)
112 .field("item", &self.item.is_some())
113 .field("stream", &"dyn Stream")
114 .finish()
115 }
116}
117
118impl<Header, Body, F> EraStage<Header, Body, F> {
119 pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
121 Self {
122 source,
123 item: None,
124 stream: None,
125 hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
126 }
127 }
128}
129
130impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
131where
132 Provider: DBProvider<Tx: DbTxMut>
133 + StaticFileProviderFactory<Primitives = N>
134 + BlockWriter<Block = N::Block>
135 + BlockReader<Block = N::Block>
136 + StageCheckpointWriter,
137 F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
138 N: NodePrimitives<BlockHeader: Value>,
139{
140 fn id(&self) -> StageId {
141 StageId::Era
142 }
143
144 fn poll_execute_ready(
145 &mut self,
146 cx: &mut Context<'_>,
147 input: ExecInput,
148 ) -> Poll<Result<(), StageError>> {
149 if input.target_reached() || self.item.is_some() {
150 return Poll::Ready(Ok(()));
151 }
152
153 if self.stream.is_none() {
154 if let Some(source) = self.source.clone() {
155 self.stream.replace(source.create(input)?);
156 }
157 }
158 if let Some(stream) = &mut self.stream {
159 if let Some(next) = ready!(stream.poll_next_unpin(cx))
160 .transpose()
161 .map_err(|e| StageError::Fatal(e.into()))?
162 {
163 self.item.replace(next);
164 }
165 }
166
167 Poll::Ready(Ok(()))
168 }
169
170 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
171 let height = if let Some(era) = self.item.take() {
172 let static_file_provider = provider.static_file_provider();
173
174 let last_header_number = static_file_provider
177 .get_highest_static_file_block(StaticFileSegment::Headers)
178 .unwrap_or_default();
179
180 let mut td = static_file_provider
182 .header_td_by_number(last_header_number)?
183 .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
184
185 let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
188
189 let height = era::process_iter(
190 era,
191 &mut writer,
192 provider,
193 &mut self.hash_collector,
194 &mut td,
195 last_header_number..=input.target(),
196 )
197 .map_err(|e| StageError::Fatal(e.into()))?;
198
199 if !self.hash_collector.is_empty() {
200 era::build_index(provider, &mut self.hash_collector)
201 .map_err(|e| StageError::Recoverable(e.into()))?;
202 self.hash_collector.clear();
203 }
204
205 era::save_stage_checkpoints(
206 &provider,
207 input.checkpoint().block_number,
208 height,
209 height,
210 input.target(),
211 )?;
212
213 height
214 } else {
215 input.target()
216 };
217
218 Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height == input.target() })
219 }
220
221 fn unwind(
222 &mut self,
223 _provider: &Provider,
224 input: UnwindInput,
225 ) -> Result<UnwindOutput, StageError> {
226 Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
227 }
228}
229
230#[derive(Debug, Clone)]
232pub enum EraImportSource {
233 Url(Url, Box<Path>),
235 Path(Box<Path>),
237}
238
239impl EraImportSource {
240 pub fn maybe_new(
252 path: Option<Box<Path>>,
253 url: Option<Url>,
254 default: impl FnOnce() -> Option<Url>,
255 folder: impl FnOnce() -> Box<Path>,
256 ) -> Option<Self> {
257 path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::test_utils::{
265 stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
266 };
267 use alloy_primitives::B256;
268 use assert_matches::assert_matches;
269 use reth_db_api::tables;
270 use reth_provider::BlockHashReader;
271 use reth_testing_utils::generators::{self, random_header};
272 use test_runner::EraTestRunner;
273
274 #[tokio::test]
275 async fn test_era_range_ends_below_target() {
276 let era_cap = 2;
277 let target = 20000;
278
279 let mut runner = EraTestRunner::default();
280
281 let input = ExecInput { target: Some(era_cap), checkpoint: None };
282 runner.seed_execution(input).unwrap();
283
284 let input = ExecInput { target: Some(target), checkpoint: None };
285 let output = runner.execute(input).await.unwrap();
286
287 runner.commit();
288
289 assert_matches!(
290 output,
291 Ok(ExecOutput {
292 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
293 done: false
294 }) if block_number == era_cap
295 );
296
297 let output = output.unwrap();
298 let validation_output = runner.validate_execution(input, Some(output.clone()));
299
300 assert_matches!(validation_output, Ok(()));
301
302 runner.take_responses();
303
304 let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
305 let output = runner.execute(input).await.unwrap();
306
307 runner.commit();
308
309 assert_matches!(
310 output,
311 Ok(ExecOutput {
312 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
313 done: true
314 }) if block_number == target
315 );
316
317 let validation_output = runner.validate_execution(input, output.ok());
318
319 assert_matches!(validation_output, Ok(()));
320 }
321
322 mod test_runner {
323 use super::*;
324 use crate::test_utils::{TestRunnerError, TestStageDB};
325 use alloy_consensus::{BlockBody, Header};
326 use futures_util::stream;
327 use reth_db_api::{
328 cursor::DbCursorRO,
329 models::{StoredBlockBodyIndices, StoredBlockOmmers},
330 transaction::DbTx,
331 };
332 use reth_ethereum_primitives::TransactionSigned;
333 use reth_primitives_traits::{SealedBlock, SealedHeader};
334 use reth_provider::{BlockNumReader, TransactionsProvider};
335 use reth_testing_utils::generators::{
336 random_block_range, random_signed_tx, BlockRangeParams,
337 };
338 use tokio::sync::watch;
339
340 pub(crate) struct EraTestRunner {
341 channel: (watch::Sender<B256>, watch::Receiver<B256>),
342 db: TestStageDB,
343 responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
344 }
345
346 impl Default for EraTestRunner {
347 fn default() -> Self {
348 Self {
349 channel: watch::channel(B256::ZERO),
350 db: TestStageDB::default(),
351 responses: Default::default(),
352 }
353 }
354 }
355
356 impl StageTestRunner for EraTestRunner {
357 type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
358
359 fn db(&self) -> &TestStageDB {
360 &self.db
361 }
362
363 fn stage(&self) -> Self::S {
364 EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
365 }
366 }
367
368 impl ExecuteStageTestRunner for EraTestRunner {
369 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
370
371 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
372 let start = input.checkpoint().block_number;
373 let end = input.target();
374
375 let static_file_provider = self.db.factory.static_file_provider();
376
377 let mut rng = generators::rng();
378
379 let blocks = random_block_range(
381 &mut rng,
382 0..=end,
383 BlockRangeParams {
384 parent: Some(B256::ZERO),
385 tx_count: 0..2,
386 ..Default::default()
387 },
388 );
389 self.db.insert_headers_with_td(blocks.iter().map(|block| block.sealed_header()))?;
390 if let Some(progress) = blocks.get(start as usize) {
391 {
393 let tx = self.db.factory.provider_rw()?.into_tx();
394 let mut static_file_producer = static_file_provider
395 .get_writer(start, StaticFileSegment::Transactions)?;
396
397 let body = StoredBlockBodyIndices {
398 first_tx_num: 0,
399 tx_count: progress.transaction_count() as u64,
400 };
401
402 static_file_producer.set_block_range(0..=progress.number);
403
404 body.tx_num_range().try_for_each(|tx_num| {
405 let transaction = random_signed_tx(&mut rng);
406 static_file_producer.append_transaction(tx_num, &transaction).map(drop)
407 })?;
408
409 if body.tx_count != 0 {
410 tx.put::<tables::TransactionBlocks>(
411 body.last_tx_num(),
412 progress.number,
413 )?;
414 }
415
416 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
417
418 if !progress.ommers_hash_is_empty() {
419 tx.put::<tables::BlockOmmers>(
420 progress.number,
421 StoredBlockOmmers { ommers: progress.body().ommers.clone() },
422 )?;
423 }
424
425 static_file_producer.commit()?;
426 tx.commit()?;
427 }
428 }
429 self.responses.replace(
430 blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
431 );
432 Ok(blocks)
433 }
434
435 fn validate_execution(
437 &self,
438 input: ExecInput,
439 output: Option<ExecOutput>,
440 ) -> Result<(), TestRunnerError> {
441 let initial_checkpoint = input.checkpoint().block_number;
442 match output {
443 Some(output) if output.checkpoint.block_number > initial_checkpoint => {
444 let provider = self.db.factory.provider()?;
445 let mut td = provider
446 .header_td_by_number(initial_checkpoint.saturating_sub(1))?
447 .unwrap_or_default();
448
449 for block_num in initial_checkpoint..
450 output
451 .checkpoint
452 .block_number
453 .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
454 as BlockNumber)
455 {
456 let hash = provider.block_hash(block_num)?.expect("no header hash");
458
459 assert_eq!(provider.block_number(hash)?, Some(block_num));
461
462 let header = provider.header_by_number(block_num)?;
464 assert!(header.is_some());
465 let header = SealedHeader::seal_slow(header.unwrap());
466 assert_eq!(header.hash(), hash);
467
468 td += header.difficulty;
470 assert_eq!(provider.header_td_by_number(block_num)?, Some(td));
471 }
472
473 self.validate_db_blocks(
474 output.checkpoint.block_number,
475 output.checkpoint.block_number,
476 )?;
477 }
478 _ => self.check_no_header_entry_above(initial_checkpoint)?,
479 };
480 Ok(())
481 }
482
483 async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
484 let tip = if headers.is_empty() {
485 let tip = random_header(&mut generators::rng(), 0, None);
486 self.db.insert_headers(iter::once(&tip))?;
487 tip.hash()
488 } else {
489 headers.last().unwrap().hash()
490 };
491 self.send_tip(tip);
492 Ok(())
493 }
494 }
495
496 impl UnwindStageTestRunner for EraTestRunner {
497 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
498 Ok(())
499 }
500 }
501
502 impl EraTestRunner {
503 pub(crate) fn check_no_header_entry_above(
504 &self,
505 block: BlockNumber,
506 ) -> Result<(), TestRunnerError> {
507 self.db
508 .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
509 self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
510 self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
511 self.db.ensure_no_entry_above::<tables::HeaderTerminalDifficulties, _>(
512 block,
513 |num| num,
514 )?;
515 Ok(())
516 }
517
518 pub(crate) fn send_tip(&self, tip: B256) {
519 self.channel.0.send(tip).expect("failed to send tip");
520 }
521
522 pub(crate) fn validate_db_blocks(
524 &self,
525 prev_progress: BlockNumber,
526 highest_block: BlockNumber,
527 ) -> Result<(), TestRunnerError> {
528 let static_file_provider = self.db.factory.static_file_provider();
529
530 self.db.query(|tx| {
531 let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
533 let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
534 let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
535
536 let first_body_key = match bodies_cursor.first()? {
537 Some((key, _)) => key,
538 None => return Ok(()),
539 };
540
541 let mut prev_number: Option<BlockNumber> = None;
542
543
544 for entry in bodies_cursor.walk(Some(first_body_key))? {
545 let (number, body) = entry?;
546
547 if number > prev_progress {
550 if let Some(prev_key) = prev_number {
551 assert_eq!(prev_key + 1, number, "Body entries must be sequential");
552 }
553 }
554
555 assert!(
557 number <= highest_block,
558 "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
559 );
560
561 let header = static_file_provider.header_by_number(number)?.expect("to be present");
562 let stored_ommers = ommers_cursor.seek_exact(number)?;
564 if header.ommers_hash_is_empty() {
565 assert!(stored_ommers.is_none(), "Unexpected ommers entry");
566 } else {
567 assert!(stored_ommers.is_some(), "Missing ommers entry");
568 }
569
570 let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
571 if body.tx_count == 0 {
572 assert_ne!(tx_block_id,Some(number));
573 } else {
574 assert_eq!(tx_block_id, Some(number));
575 }
576
577 for tx_id in body.tx_num_range() {
578 assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
579 }
580
581 prev_number = Some(number);
582 }
583 Ok(())
584 })?;
585 Ok(())
586 }
587
588 pub(crate) fn take_responses(&mut self) {
589 self.responses.take();
590 }
591
592 pub(crate) fn commit(&self) {
593 self.db.factory.static_file_provider().commit().unwrap();
594 }
595 }
596
597 #[derive(Clone)]
598 pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
599
600 impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
601 fn create(
602 self,
603 _input: ExecInput,
604 ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
605 {
606 let stream = stream::iter(vec![self.0]);
607
608 Ok(Box::new(Box::pin(stream.map(|meta| {
609 Ok(Box::new(meta.into_iter().map(Ok))
610 as Item<Header, BlockBody<TransactionSigned>>)
611 }))))
612 }
613 }
614 }
615
616 stage_test_suite!(EraTestRunner, era);
617}