1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{era1_file::Era1Reader, era_file_ops::StreamReader};
8use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
9use reth_era_utils as era;
10use reth_etl::Collector;
11use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
12use reth_provider::{
13 BlockReader, BlockWriter, DBProvider, StageCheckpointWriter, StaticFileProviderFactory,
14 StaticFileWriter,
15};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use reth_static_file_types::StaticFileSegment;
18use std::{
19 fmt::{Debug, Formatter},
20 iter,
21 path::Path,
22 task::{ready, Context, Poll},
23};
24
25type Item<Header, Body> =
26 Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
27type ThreadSafeEraStream<Header, Body> =
28 Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
29
30pub struct EraStage<Header, Body, StreamFactory> {
36 source: Option<StreamFactory>,
38 hash_collector: Collector<BlockHash, BlockNumber>,
41 item: Option<Item<Header, Body>>,
43 stream: Option<ThreadSafeEraStream<Header, Body>>,
45}
46
47trait EraStreamFactory<Header, Body> {
48 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
49}
50
51impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
52where
53 Header: FullBlockHeader + Value,
54 Body: FullBlockBody<OmmerHeader = Header>,
55{
56 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
57 match self {
58 Self::Path(path) => Self::convert(
59 read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
60 ),
61 Self::Url(url, folder) => {
62 let _ = reth_fs_util::create_dir_all(&folder);
63 let client = EraClient::new(Client::new(), url, folder);
64
65 Self::convert(EraStream::new(
66 client,
67 EraStreamConfig::default().start_from(input.next_block()),
68 ))
69 }
70 }
71 }
72}
73
74impl EraImportSource {
75 fn convert<Header, Body>(
76 stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
77 + Send
78 + Sync
79 + 'static
80 + Unpin,
81 ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
82 where
83 Header: FullBlockHeader + Value,
84 Body: FullBlockBody<OmmerHeader = Header>,
85 {
86 Ok(Box::new(Box::pin(stream.map(|meta| {
87 meta.and_then(|meta| {
88 let file = reth_fs_util::open(meta.path())?;
89 let reader = Era1Reader::new(file);
90 let iter = reader.iter();
91 let iter = iter.map(era::decode);
92 let iter = iter.chain(
93 iter::once_with(move || match meta.mark_as_processed() {
94 Ok(..) => None,
95 Err(e) => Some(Err(e)),
96 })
97 .flatten(),
98 );
99
100 Ok(Box::new(iter) as Item<Header, Body>)
101 })
102 }))))
103 }
104}
105
106impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
107 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("EraStage")
109 .field("source", &self.source)
110 .field("hash_collector", &self.hash_collector)
111 .field("item", &self.item.is_some())
112 .field("stream", &"dyn Stream")
113 .finish()
114 }
115}
116
117impl<Header, Body, F> EraStage<Header, Body, F> {
118 pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
120 Self {
121 source,
122 item: None,
123 stream: None,
124 hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
125 }
126 }
127}
128
129impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
130where
131 Provider: DBProvider<Tx: DbTxMut>
132 + StaticFileProviderFactory<Primitives = N>
133 + BlockWriter<Block = N::Block>
134 + BlockReader<Block = N::Block>
135 + StageCheckpointWriter,
136 F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
137 N: NodePrimitives<BlockHeader: Value>,
138{
139 fn id(&self) -> StageId {
140 StageId::Era
141 }
142
143 fn poll_execute_ready(
144 &mut self,
145 cx: &mut Context<'_>,
146 input: ExecInput,
147 ) -> Poll<Result<(), StageError>> {
148 if input.target_reached() || self.item.is_some() {
149 return Poll::Ready(Ok(()));
150 }
151
152 if self.stream.is_none() &&
153 let Some(source) = self.source.clone()
154 {
155 self.stream.replace(source.create(input)?);
156 }
157 if let Some(stream) = &mut self.stream &&
158 let Some(next) = ready!(stream.poll_next_unpin(cx))
159 .transpose()
160 .map_err(|e| StageError::Fatal(e.into()))?
161 {
162 self.item.replace(next);
163 }
164
165 Poll::Ready(Ok(()))
166 }
167
168 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
169 let height = if let Some(era) = self.item.take() {
170 let static_file_provider = provider.static_file_provider();
171
172 let last_header_number = static_file_provider
175 .get_highest_static_file_block(StaticFileSegment::Headers)
176 .unwrap_or_default();
177
178 let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
181
182 let height = era::process_iter(
183 era,
184 &mut writer,
185 provider,
186 &mut self.hash_collector,
187 last_header_number..=input.target(),
188 )
189 .map_err(|e| StageError::Fatal(e.into()))?;
190
191 if !self.hash_collector.is_empty() {
192 era::build_index(provider, &mut self.hash_collector)
193 .map_err(|e| StageError::Recoverable(e.into()))?;
194 self.hash_collector.clear();
195 }
196
197 era::save_stage_checkpoints(
198 &provider,
199 input.checkpoint().block_number,
200 height,
201 height,
202 input.target(),
203 )?;
204
205 height
206 } else {
207 let highest_header = provider
217 .static_file_provider()
218 .get_highest_static_file_block(StaticFileSegment::Headers)
219 .unwrap_or_default();
220
221 let checkpoint = input.checkpoint().block_number;
222 let from_target = input.target.unwrap_or(checkpoint);
223
224 checkpoint.max(highest_header).max(from_target)
225 };
226
227 Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height >= input.target() })
228 }
229
230 fn unwind(
231 &mut self,
232 _provider: &Provider,
233 input: UnwindInput,
234 ) -> Result<UnwindOutput, StageError> {
235 Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
236 }
237}
238
239#[derive(Debug, Clone)]
241pub enum EraImportSource {
242 Url(Url, Box<Path>),
244 Path(Box<Path>),
246}
247
248impl EraImportSource {
249 pub fn maybe_new(
261 path: Option<Box<Path>>,
262 url: Option<Url>,
263 default: impl FnOnce() -> Option<Url>,
264 folder: impl FnOnce() -> Box<Path>,
265 ) -> Option<Self> {
266 path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::test_utils::{
274 stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
275 };
276 use alloy_primitives::B256;
277 use assert_matches::assert_matches;
278 use reth_db_api::tables;
279 use reth_provider::BlockHashReader;
280 use reth_testing_utils::generators::{self, random_header};
281 use test_runner::EraTestRunner;
282
283 #[tokio::test]
284 async fn test_era_range_ends_below_target() {
285 let era_cap = 2;
286 let target = 20000;
287
288 let mut runner = EraTestRunner::default();
289
290 let input = ExecInput { target: Some(era_cap), checkpoint: None };
291 runner.seed_execution(input).unwrap();
292
293 let input = ExecInput { target: Some(target), checkpoint: None };
294 let output = runner.execute(input).await.unwrap();
295
296 runner.commit();
297
298 assert_matches!(
299 output,
300 Ok(ExecOutput {
301 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
302 done: false
303 }) if block_number == era_cap
304 );
305
306 let output = output.unwrap();
307 let validation_output = runner.validate_execution(input, Some(output.clone()));
308
309 assert_matches!(validation_output, Ok(()));
310
311 runner.take_responses();
312
313 let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
314 let output = runner.execute(input).await.unwrap();
315
316 runner.commit();
317
318 assert_matches!(
319 output,
320 Ok(ExecOutput {
321 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
322 done: true
323 }) if block_number == target
324 );
325
326 let validation_output = runner.validate_execution(input, output.ok());
327
328 assert_matches!(validation_output, Ok(()));
329 }
330
331 mod test_runner {
332 use super::*;
333 use crate::test_utils::{TestRunnerError, TestStageDB};
334 use alloy_consensus::{BlockBody, Header};
335 use futures_util::stream;
336 use reth_db_api::{
337 cursor::DbCursorRO,
338 models::{StoredBlockBodyIndices, StoredBlockOmmers},
339 transaction::DbTx,
340 };
341 use reth_ethereum_primitives::TransactionSigned;
342 use reth_primitives_traits::{SealedBlock, SealedHeader};
343 use reth_provider::{BlockNumReader, HeaderProvider, TransactionsProvider};
344 use reth_testing_utils::generators::{
345 random_block_range, random_signed_tx, BlockRangeParams,
346 };
347 use tokio::sync::watch;
348
349 pub(crate) struct EraTestRunner {
350 channel: (watch::Sender<B256>, watch::Receiver<B256>),
351 db: TestStageDB,
352 responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
353 }
354
355 impl Default for EraTestRunner {
356 fn default() -> Self {
357 Self {
358 channel: watch::channel(B256::ZERO),
359 db: TestStageDB::default(),
360 responses: Default::default(),
361 }
362 }
363 }
364
365 impl StageTestRunner for EraTestRunner {
366 type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
367
368 fn db(&self) -> &TestStageDB {
369 &self.db
370 }
371
372 fn stage(&self) -> Self::S {
373 EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
374 }
375 }
376
377 impl ExecuteStageTestRunner for EraTestRunner {
378 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
379
380 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
381 let start = input.checkpoint().block_number;
382 let end = input.target();
383
384 let static_file_provider = self.db.factory.static_file_provider();
385
386 let mut rng = generators::rng();
387
388 let blocks = random_block_range(
390 &mut rng,
391 0..=end,
392 BlockRangeParams {
393 parent: Some(B256::ZERO),
394 tx_count: 0..2,
395 ..Default::default()
396 },
397 );
398 self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
399 if let Some(progress) = blocks.get(start as usize) {
400 {
402 let tx = self.db.factory.provider_rw()?.into_tx();
403 let mut static_file_producer = static_file_provider
404 .get_writer(start, StaticFileSegment::Transactions)?;
405
406 let body = StoredBlockBodyIndices {
407 first_tx_num: 0,
408 tx_count: progress.transaction_count() as u64,
409 };
410
411 static_file_producer.set_block_range(0..=progress.number);
412
413 body.tx_num_range().try_for_each(|tx_num| {
414 let transaction = random_signed_tx(&mut rng);
415 static_file_producer.append_transaction(tx_num, &transaction).map(drop)
416 })?;
417
418 if body.tx_count != 0 {
419 tx.put::<tables::TransactionBlocks>(
420 body.last_tx_num(),
421 progress.number,
422 )?;
423 }
424
425 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
426
427 if !progress.ommers_hash_is_empty() {
428 tx.put::<tables::BlockOmmers>(
429 progress.number,
430 StoredBlockOmmers { ommers: progress.body().ommers.clone() },
431 )?;
432 }
433
434 static_file_producer.commit()?;
435 tx.commit()?;
436 }
437 }
438 self.responses.replace(
439 blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
440 );
441 Ok(blocks)
442 }
443
444 fn validate_execution(
446 &self,
447 input: ExecInput,
448 output: Option<ExecOutput>,
449 ) -> Result<(), TestRunnerError> {
450 let initial_checkpoint = input.checkpoint().block_number;
451 match output {
452 Some(output) if output.checkpoint.block_number > initial_checkpoint => {
453 let provider = self.db.factory.provider()?;
454
455 for block_num in initial_checkpoint..
456 output
457 .checkpoint
458 .block_number
459 .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
460 as BlockNumber)
461 {
462 let hash = provider.block_hash(block_num)?.expect("no header hash");
464
465 assert_eq!(provider.block_number(hash)?, Some(block_num));
467
468 let header = provider.header_by_number(block_num)?;
470 assert!(header.is_some());
471 let header = SealedHeader::seal_slow(header.unwrap());
472 assert_eq!(header.hash(), hash);
473 }
474
475 self.validate_db_blocks(
476 output.checkpoint.block_number,
477 output.checkpoint.block_number,
478 )?;
479 }
480 _ => self.check_no_header_entry_above(initial_checkpoint)?,
481 };
482 Ok(())
483 }
484
485 async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
486 let tip = if headers.is_empty() {
487 let tip = random_header(&mut generators::rng(), 0, None);
488 self.db.insert_headers(iter::once(&tip))?;
489 tip.hash()
490 } else {
491 headers.last().unwrap().hash()
492 };
493 self.send_tip(tip);
494 Ok(())
495 }
496 }
497
498 impl UnwindStageTestRunner for EraTestRunner {
499 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
500 Ok(())
501 }
502 }
503
504 impl EraTestRunner {
505 pub(crate) fn check_no_header_entry_above(
506 &self,
507 block: BlockNumber,
508 ) -> Result<(), TestRunnerError> {
509 self.db
510 .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
511 self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
512 self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
513 Ok(())
514 }
515
516 pub(crate) fn send_tip(&self, tip: B256) {
517 self.channel.0.send(tip).expect("failed to send tip");
518 }
519
520 pub(crate) fn validate_db_blocks(
522 &self,
523 prev_progress: BlockNumber,
524 highest_block: BlockNumber,
525 ) -> Result<(), TestRunnerError> {
526 let static_file_provider = self.db.factory.static_file_provider();
527
528 self.db.query(|tx| {
529 let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
531 let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
532 let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
533
534 let first_body_key = match bodies_cursor.first()? {
535 Some((key, _)) => key,
536 None => return Ok(()),
537 };
538
539 let mut prev_number: Option<BlockNumber> = None;
540
541
542 for entry in bodies_cursor.walk(Some(first_body_key))? {
543 let (number, body) = entry?;
544
545 if number > prev_progress
548 && let Some(prev_key) = prev_number {
549 assert_eq!(prev_key + 1, number, "Body entries must be sequential");
550 }
551
552 assert!(
554 number <= highest_block,
555 "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
556 );
557
558 let header = static_file_provider.header_by_number(number)?.expect("to be present");
559 let stored_ommers = ommers_cursor.seek_exact(number)?;
561 if header.ommers_hash_is_empty() {
562 assert!(stored_ommers.is_none(), "Unexpected ommers entry");
563 } else {
564 assert!(stored_ommers.is_some(), "Missing ommers entry");
565 }
566
567 let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
568 if body.tx_count == 0 {
569 assert_ne!(tx_block_id,Some(number));
570 } else {
571 assert_eq!(tx_block_id, Some(number));
572 }
573
574 for tx_id in body.tx_num_range() {
575 assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
576 }
577
578 prev_number = Some(number);
579 }
580 Ok(())
581 })?;
582 Ok(())
583 }
584
585 pub(crate) fn take_responses(&mut self) {
586 self.responses.take();
587 }
588
589 pub(crate) fn commit(&self) {
590 self.db.factory.static_file_provider().commit().unwrap();
591 }
592 }
593
594 #[derive(Clone)]
595 pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
596
597 impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
598 fn create(
599 self,
600 _input: ExecInput,
601 ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
602 {
603 let stream = stream::iter(vec![self.0]);
604
605 Ok(Box::new(Box::pin(stream.map(|meta| {
606 Ok(Box::new(meta.into_iter().map(Ok))
607 as Item<Header, BlockBody<TransactionSigned>>)
608 }))))
609 }
610 }
611 }
612
613 stage_test_suite!(EraTestRunner, era);
614}