1use crate::{StageCheckpoint, StageId};
2use alloy_primitives::{BlockHash, BlockNumber};
3use futures_util::{Stream, StreamExt};
4use reqwest::{Client, Url};
5use reth_config::config::EtlConfig;
6use reth_db_api::{table::Value, transaction::DbTxMut};
7use reth_era::{era1_file::Era1Reader, era_file_ops::StreamReader};
8use reth_era_downloader::{read_dir, EraClient, EraMeta, EraStream, EraStreamConfig};
9use reth_era_utils as era;
10use reth_etl::Collector;
11use reth_primitives_traits::{FullBlockBody, FullBlockHeader, NodePrimitives};
12use reth_provider::{
13 BlockReader, BlockWriter, DBProvider, HeaderProvider, StageCheckpointWriter,
14 StaticFileProviderFactory, StaticFileWriter,
15};
16use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_errors::ProviderError;
19use std::{
20 fmt::{Debug, Formatter},
21 iter,
22 path::Path,
23 task::{ready, Context, Poll},
24};
25
26type Item<Header, Body> =
27 Box<dyn Iterator<Item = eyre::Result<(Header, Body)>> + Send + Sync + Unpin>;
28type ThreadSafeEraStream<Header, Body> =
29 Box<dyn Stream<Item = eyre::Result<Item<Header, Body>>> + Send + Sync + Unpin>;
30
31pub struct EraStage<Header, Body, StreamFactory> {
37 source: Option<StreamFactory>,
39 hash_collector: Collector<BlockHash, BlockNumber>,
42 item: Option<Item<Header, Body>>,
44 stream: Option<ThreadSafeEraStream<Header, Body>>,
46}
47
48trait EraStreamFactory<Header, Body> {
49 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError>;
50}
51
52impl<Header, Body> EraStreamFactory<Header, Body> for EraImportSource
53where
54 Header: FullBlockHeader + Value,
55 Body: FullBlockBody<OmmerHeader = Header>,
56{
57 fn create(self, input: ExecInput) -> Result<ThreadSafeEraStream<Header, Body>, StageError> {
58 match self {
59 Self::Path(path) => Self::convert(
60 read_dir(path, input.next_block()).map_err(|e| StageError::Fatal(e.into()))?,
61 ),
62 Self::Url(url, folder) => {
63 let _ = reth_fs_util::create_dir_all(&folder);
64 let client = EraClient::new(Client::new(), url, folder);
65
66 Self::convert(EraStream::new(
67 client,
68 EraStreamConfig::default().start_from(input.next_block()),
69 ))
70 }
71 }
72 }
73}
74
75impl EraImportSource {
76 fn convert<Header, Body>(
77 stream: impl Stream<Item = eyre::Result<impl EraMeta + Send + Sync + 'static + Unpin>>
78 + Send
79 + Sync
80 + 'static
81 + Unpin,
82 ) -> Result<ThreadSafeEraStream<Header, Body>, StageError>
83 where
84 Header: FullBlockHeader + Value,
85 Body: FullBlockBody<OmmerHeader = Header>,
86 {
87 Ok(Box::new(Box::pin(stream.map(|meta| {
88 meta.and_then(|meta| {
89 let file = reth_fs_util::open(meta.path())?;
90 let reader = Era1Reader::new(file);
91 let iter = reader.iter();
92 let iter = iter.map(era::decode);
93 let iter = iter.chain(
94 iter::once_with(move || match meta.mark_as_processed() {
95 Ok(..) => None,
96 Err(e) => Some(Err(e)),
97 })
98 .flatten(),
99 );
100
101 Ok(Box::new(iter) as Item<Header, Body>)
102 })
103 }))))
104 }
105}
106
107impl<Header: Debug, Body: Debug, F: Debug> Debug for EraStage<Header, Body, F> {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("EraStage")
110 .field("source", &self.source)
111 .field("hash_collector", &self.hash_collector)
112 .field("item", &self.item.is_some())
113 .field("stream", &"dyn Stream")
114 .finish()
115 }
116}
117
118impl<Header, Body, F> EraStage<Header, Body, F> {
119 pub fn new(source: Option<F>, etl_config: EtlConfig) -> Self {
121 Self {
122 source,
123 item: None,
124 stream: None,
125 hash_collector: Collector::new(etl_config.file_size, etl_config.dir),
126 }
127 }
128}
129
130impl<Provider, N, F> Stage<Provider> for EraStage<N::BlockHeader, N::BlockBody, F>
131where
132 Provider: DBProvider<Tx: DbTxMut>
133 + StaticFileProviderFactory<Primitives = N>
134 + BlockWriter<Block = N::Block>
135 + BlockReader<Block = N::Block>
136 + StageCheckpointWriter,
137 F: EraStreamFactory<N::BlockHeader, N::BlockBody> + Send + Sync + Clone,
138 N: NodePrimitives<BlockHeader: Value>,
139{
140 fn id(&self) -> StageId {
141 StageId::Era
142 }
143
144 fn poll_execute_ready(
145 &mut self,
146 cx: &mut Context<'_>,
147 input: ExecInput,
148 ) -> Poll<Result<(), StageError>> {
149 if input.target_reached() || self.item.is_some() {
150 return Poll::Ready(Ok(()));
151 }
152
153 if self.stream.is_none() &&
154 let Some(source) = self.source.clone()
155 {
156 self.stream.replace(source.create(input)?);
157 }
158 if let Some(stream) = &mut self.stream &&
159 let Some(next) = ready!(stream.poll_next_unpin(cx))
160 .transpose()
161 .map_err(|e| StageError::Fatal(e.into()))?
162 {
163 self.item.replace(next);
164 }
165
166 Poll::Ready(Ok(()))
167 }
168
169 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
170 let height = if let Some(era) = self.item.take() {
171 let static_file_provider = provider.static_file_provider();
172
173 let last_header_number = static_file_provider
176 .get_highest_static_file_block(StaticFileSegment::Headers)
177 .unwrap_or_default();
178
179 let mut td = static_file_provider
181 .header_td_by_number(last_header_number)?
182 .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
183
184 let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
187
188 let height = era::process_iter(
189 era,
190 &mut writer,
191 provider,
192 &mut self.hash_collector,
193 &mut td,
194 last_header_number..=input.target(),
195 )
196 .map_err(|e| StageError::Fatal(e.into()))?;
197
198 if !self.hash_collector.is_empty() {
199 era::build_index(provider, &mut self.hash_collector)
200 .map_err(|e| StageError::Recoverable(e.into()))?;
201 self.hash_collector.clear();
202 }
203
204 era::save_stage_checkpoints(
205 &provider,
206 input.checkpoint().block_number,
207 height,
208 height,
209 input.target(),
210 )?;
211
212 height
213 } else {
214 input.target()
215 };
216
217 Ok(ExecOutput { checkpoint: StageCheckpoint::new(height), done: height == input.target() })
218 }
219
220 fn unwind(
221 &mut self,
222 _provider: &Provider,
223 input: UnwindInput,
224 ) -> Result<UnwindOutput, StageError> {
225 Ok(UnwindOutput { checkpoint: input.checkpoint.with_block_number(input.unwind_to) })
226 }
227}
228
229#[derive(Debug, Clone)]
231pub enum EraImportSource {
232 Url(Url, Box<Path>),
234 Path(Box<Path>),
236}
237
238impl EraImportSource {
239 pub fn maybe_new(
251 path: Option<Box<Path>>,
252 url: Option<Url>,
253 default: impl FnOnce() -> Option<Url>,
254 folder: impl FnOnce() -> Box<Path>,
255 ) -> Option<Self> {
256 path.map(Self::Path).or_else(|| url.or_else(default).map(|url| Self::Url(url, folder())))
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use crate::test_utils::{
264 stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
265 };
266 use alloy_primitives::B256;
267 use assert_matches::assert_matches;
268 use reth_db_api::tables;
269 use reth_provider::BlockHashReader;
270 use reth_testing_utils::generators::{self, random_header};
271 use test_runner::EraTestRunner;
272
273 #[tokio::test]
274 async fn test_era_range_ends_below_target() {
275 let era_cap = 2;
276 let target = 20000;
277
278 let mut runner = EraTestRunner::default();
279
280 let input = ExecInput { target: Some(era_cap), checkpoint: None };
281 runner.seed_execution(input).unwrap();
282
283 let input = ExecInput { target: Some(target), checkpoint: None };
284 let output = runner.execute(input).await.unwrap();
285
286 runner.commit();
287
288 assert_matches!(
289 output,
290 Ok(ExecOutput {
291 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
292 done: false
293 }) if block_number == era_cap
294 );
295
296 let output = output.unwrap();
297 let validation_output = runner.validate_execution(input, Some(output.clone()));
298
299 assert_matches!(validation_output, Ok(()));
300
301 runner.take_responses();
302
303 let input = ExecInput { target: Some(target), checkpoint: Some(output.checkpoint) };
304 let output = runner.execute(input).await.unwrap();
305
306 runner.commit();
307
308 assert_matches!(
309 output,
310 Ok(ExecOutput {
311 checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
312 done: true
313 }) if block_number == target
314 );
315
316 let validation_output = runner.validate_execution(input, output.ok());
317
318 assert_matches!(validation_output, Ok(()));
319 }
320
321 mod test_runner {
322 use super::*;
323 use crate::test_utils::{TestRunnerError, TestStageDB};
324 use alloy_consensus::{BlockBody, Header};
325 use futures_util::stream;
326 use reth_db_api::{
327 cursor::DbCursorRO,
328 models::{StoredBlockBodyIndices, StoredBlockOmmers},
329 transaction::DbTx,
330 };
331 use reth_ethereum_primitives::TransactionSigned;
332 use reth_primitives_traits::{SealedBlock, SealedHeader};
333 use reth_provider::{BlockNumReader, TransactionsProvider};
334 use reth_testing_utils::generators::{
335 random_block_range, random_signed_tx, BlockRangeParams,
336 };
337 use tokio::sync::watch;
338
339 pub(crate) struct EraTestRunner {
340 channel: (watch::Sender<B256>, watch::Receiver<B256>),
341 db: TestStageDB,
342 responses: Option<Vec<(Header, BlockBody<TransactionSigned>)>>,
343 }
344
345 impl Default for EraTestRunner {
346 fn default() -> Self {
347 Self {
348 channel: watch::channel(B256::ZERO),
349 db: TestStageDB::default(),
350 responses: Default::default(),
351 }
352 }
353 }
354
355 impl StageTestRunner for EraTestRunner {
356 type S = EraStage<Header, BlockBody<TransactionSigned>, StubResponses>;
357
358 fn db(&self) -> &TestStageDB {
359 &self.db
360 }
361
362 fn stage(&self) -> Self::S {
363 EraStage::new(self.responses.clone().map(StubResponses), EtlConfig::default())
364 }
365 }
366
367 impl ExecuteStageTestRunner for EraTestRunner {
368 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
369
370 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
371 let start = input.checkpoint().block_number;
372 let end = input.target();
373
374 let static_file_provider = self.db.factory.static_file_provider();
375
376 let mut rng = generators::rng();
377
378 let blocks = random_block_range(
380 &mut rng,
381 0..=end,
382 BlockRangeParams {
383 parent: Some(B256::ZERO),
384 tx_count: 0..2,
385 ..Default::default()
386 },
387 );
388 self.db.insert_headers_with_td(blocks.iter().map(|block| block.sealed_header()))?;
389 if let Some(progress) = blocks.get(start as usize) {
390 {
392 let tx = self.db.factory.provider_rw()?.into_tx();
393 let mut static_file_producer = static_file_provider
394 .get_writer(start, StaticFileSegment::Transactions)?;
395
396 let body = StoredBlockBodyIndices {
397 first_tx_num: 0,
398 tx_count: progress.transaction_count() as u64,
399 };
400
401 static_file_producer.set_block_range(0..=progress.number);
402
403 body.tx_num_range().try_for_each(|tx_num| {
404 let transaction = random_signed_tx(&mut rng);
405 static_file_producer.append_transaction(tx_num, &transaction).map(drop)
406 })?;
407
408 if body.tx_count != 0 {
409 tx.put::<tables::TransactionBlocks>(
410 body.last_tx_num(),
411 progress.number,
412 )?;
413 }
414
415 tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
416
417 if !progress.ommers_hash_is_empty() {
418 tx.put::<tables::BlockOmmers>(
419 progress.number,
420 StoredBlockOmmers { ommers: progress.body().ommers.clone() },
421 )?;
422 }
423
424 static_file_producer.commit()?;
425 tx.commit()?;
426 }
427 }
428 self.responses.replace(
429 blocks.iter().map(|v| (v.header().clone(), v.body().clone())).collect(),
430 );
431 Ok(blocks)
432 }
433
434 fn validate_execution(
436 &self,
437 input: ExecInput,
438 output: Option<ExecOutput>,
439 ) -> Result<(), TestRunnerError> {
440 let initial_checkpoint = input.checkpoint().block_number;
441 match output {
442 Some(output) if output.checkpoint.block_number > initial_checkpoint => {
443 let provider = self.db.factory.provider()?;
444 let mut td = provider
445 .header_td_by_number(initial_checkpoint.saturating_sub(1))?
446 .unwrap_or_default();
447
448 for block_num in initial_checkpoint..
449 output
450 .checkpoint
451 .block_number
452 .min(self.responses.as_ref().map(|v| v.len()).unwrap_or_default()
453 as BlockNumber)
454 {
455 let hash = provider.block_hash(block_num)?.expect("no header hash");
457
458 assert_eq!(provider.block_number(hash)?, Some(block_num));
460
461 let header = provider.header_by_number(block_num)?;
463 assert!(header.is_some());
464 let header = SealedHeader::seal_slow(header.unwrap());
465 assert_eq!(header.hash(), hash);
466
467 td += header.difficulty;
469 assert_eq!(provider.header_td_by_number(block_num)?, Some(td));
470 }
471
472 self.validate_db_blocks(
473 output.checkpoint.block_number,
474 output.checkpoint.block_number,
475 )?;
476 }
477 _ => self.check_no_header_entry_above(initial_checkpoint)?,
478 };
479 Ok(())
480 }
481
482 async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
483 let tip = if headers.is_empty() {
484 let tip = random_header(&mut generators::rng(), 0, None);
485 self.db.insert_headers(iter::once(&tip))?;
486 tip.hash()
487 } else {
488 headers.last().unwrap().hash()
489 };
490 self.send_tip(tip);
491 Ok(())
492 }
493 }
494
495 impl UnwindStageTestRunner for EraTestRunner {
496 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
497 Ok(())
498 }
499 }
500
501 impl EraTestRunner {
502 pub(crate) fn check_no_header_entry_above(
503 &self,
504 block: BlockNumber,
505 ) -> Result<(), TestRunnerError> {
506 self.db
507 .ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
508 self.db.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
509 self.db.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
510 self.db.ensure_no_entry_above::<tables::HeaderTerminalDifficulties, _>(
511 block,
512 |num| num,
513 )?;
514 Ok(())
515 }
516
517 pub(crate) fn send_tip(&self, tip: B256) {
518 self.channel.0.send(tip).expect("failed to send tip");
519 }
520
521 pub(crate) fn validate_db_blocks(
523 &self,
524 prev_progress: BlockNumber,
525 highest_block: BlockNumber,
526 ) -> Result<(), TestRunnerError> {
527 let static_file_provider = self.db.factory.static_file_provider();
528
529 self.db.query(|tx| {
530 let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
532 let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
533 let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
534
535 let first_body_key = match bodies_cursor.first()? {
536 Some((key, _)) => key,
537 None => return Ok(()),
538 };
539
540 let mut prev_number: Option<BlockNumber> = None;
541
542
543 for entry in bodies_cursor.walk(Some(first_body_key))? {
544 let (number, body) = entry?;
545
546 if number > prev_progress
549 && let Some(prev_key) = prev_number {
550 assert_eq!(prev_key + 1, number, "Body entries must be sequential");
551 }
552
553 assert!(
555 number <= highest_block,
556 "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
557 );
558
559 let header = static_file_provider.header_by_number(number)?.expect("to be present");
560 let stored_ommers = ommers_cursor.seek_exact(number)?;
562 if header.ommers_hash_is_empty() {
563 assert!(stored_ommers.is_none(), "Unexpected ommers entry");
564 } else {
565 assert!(stored_ommers.is_some(), "Missing ommers entry");
566 }
567
568 let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
569 if body.tx_count == 0 {
570 assert_ne!(tx_block_id,Some(number));
571 } else {
572 assert_eq!(tx_block_id, Some(number));
573 }
574
575 for tx_id in body.tx_num_range() {
576 assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
577 }
578
579 prev_number = Some(number);
580 }
581 Ok(())
582 })?;
583 Ok(())
584 }
585
586 pub(crate) fn take_responses(&mut self) {
587 self.responses.take();
588 }
589
590 pub(crate) fn commit(&self) {
591 self.db.factory.static_file_provider().commit().unwrap();
592 }
593 }
594
595 #[derive(Clone)]
596 pub(crate) struct StubResponses(Vec<(Header, BlockBody<TransactionSigned>)>);
597
598 impl EraStreamFactory<Header, BlockBody<TransactionSigned>> for StubResponses {
599 fn create(
600 self,
601 _input: ExecInput,
602 ) -> Result<ThreadSafeEraStream<Header, BlockBody<TransactionSigned>>, StageError>
603 {
604 let stream = stream::iter(vec![self.0]);
605
606 Ok(Box::new(Box::pin(stream.map(|meta| {
607 Ok(Box::new(meta.into_iter().map(Ok))
608 as Item<Header, BlockBody<TransactionSigned>>)
609 }))))
610 }
611 }
612 }
613
614 stage_test_suite!(EraTestRunner, era);
615}