1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 table::Value,
7 tables,
8 transaction::{DbTx, DbTxMut},
9 RawKey, RawTable, RawValue,
10};
11use reth_era::{
12 common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
13 e2s::error::E2sError,
14 era1::{file::Era1Reader, types::execution::BlockTuple},
15 ere::{file::EreReader, types::execution::BlockTuple as EreBlockTuple},
16};
17use reth_era_downloader::EraMeta;
18use reth_etl::Collector;
19use reth_fs_util as fs;
20use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
21use reth_provider::{
22 providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
23 StaticFileSegment, StaticFileWriter,
24};
25use reth_stages_types::{
26 CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
27};
28use reth_storage_api::{
29 errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
30 StageCheckpointWriter,
31};
32use std::{collections::Bound, error::Error, ops::RangeBounds, sync::mpsc};
33use tracing::info;
34
35pub trait EraBlockReader<BH, BB> {
39 fn blocks<M: EraMeta + ?Sized>(
41 meta: &M,
42 ) -> eyre::Result<impl Iterator<Item = eyre::Result<(BH, BB)>>>;
43}
44
45#[derive(Debug)]
47pub struct Era1;
48
49impl<BH, BB> EraBlockReader<BH, BB> for Era1
50where
51 BH: FullBlockHeader + Value,
52 BB: FullBlockBody<OmmerHeader = BH>,
53{
54 fn blocks<M: EraMeta + ?Sized>(
55 meta: &M,
56 ) -> eyre::Result<impl Iterator<Item = eyre::Result<(BH, BB)>>> {
57 let reader: Era1Reader<std::fs::File> = open(meta)?;
58 Ok(reader.iter().map(decode::<BH, BB, E2sError>))
59 }
60}
61
62impl<BH, BB> EraBlockReader<BH, BB> for Ere
63where
64 BH: FullBlockHeader + Value,
65 BB: FullBlockBody<OmmerHeader = BH>,
66{
67 fn blocks<M: EraMeta + ?Sized>(
68 meta: &M,
69 ) -> eyre::Result<impl Iterator<Item = eyre::Result<(BH, BB)>>> {
70 let reader: EreReader<std::fs::File> = open(meta)?;
71 Ok(reader.iter().map(Self::decode))
72 }
73}
74
75#[derive(Debug)]
77pub struct Ere;
78
79impl Ere {
80 pub fn decode<BH, BB, E>(block: Result<EreBlockTuple, E>) -> eyre::Result<(BH, BB)>
83 where
84 BH: FullBlockHeader + Value,
85 BB: FullBlockBody<OmmerHeader = BH>,
86 E: From<E2sError> + Error + Send + Sync + 'static,
87 {
88 let block = block?;
89 let header: BH = block.header.decode()?;
90 let body: BB = block.body.decode()?;
91 Ok((header, body))
92 }
93}
94
95pub fn open<Reader>(meta: &(impl EraMeta + ?Sized)) -> eyre::Result<Reader>
97where
98 Reader: StreamReader<std::fs::File>,
99{
100 Ok(Reader::new(fs::open(meta.path())?))
101}
102
103pub fn import<S, Downloader, Era, PF, B, BB, BH>(
110 mut downloader: Downloader,
111 provider_factory: &PF,
112 hash_collector: &mut Collector<BlockHash, BlockNumber>,
113 to_block: Option<BlockNumber>,
114) -> eyre::Result<BlockNumber>
115where
116 S: EraBlockReader<BH, BB>,
117 B: Block<Header = BH, Body = BB>,
118 BH: FullBlockHeader + Value,
119 BB: FullBlockBody<
120 Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
121 OmmerHeader = BH,
122 >,
123 Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
124 Era: EraMeta + Send + 'static,
125 PF: DatabaseProviderFactory<
126 ProviderRW: BlockWriter<Block = B>
127 + DBProvider
128 + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
129 + StageCheckpointWriter,
130 > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
131{
132 let (tx, rx) = mpsc::channel();
133
134 tokio::spawn(async move {
136 while let Some(file) = downloader.next().await {
137 tx.send(Some(file))?;
138 }
139 tx.send(None)
140 });
141
142 let static_file_provider = provider_factory.static_file_provider();
143
144 let mut height = static_file_provider
147 .get_highest_static_file_block(StaticFileSegment::Headers)
148 .unwrap_or_default();
149
150 let end = to_block.map_or(Bound::Unbounded, Bound::Included);
151
152 while let Some(meta) = rx.recv()? {
153 let meta = meta?;
154 let from = height;
155 let provider = provider_factory.database_provider_rw()?;
156
157 height = process::<S, _, _, _, _>(
158 &meta,
159 &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
160 &provider,
161 hash_collector,
162 (Bound::Included(height), end),
163 )?;
164
165 save_stage_checkpoints(&provider, from, height, height, height)?;
166
167 provider.commit()?;
168
169 info!(target: "era::history::import", first = from, last = height, file = %meta.path().display(), "Imported ERA file");
170
171 if to_block.is_some_and(|to| height >= to) {
172 break;
173 }
174 }
175
176 let provider = provider_factory.database_provider_rw()?;
177
178 build_index(&provider, hash_collector)?;
179
180 provider.commit()?;
181
182 Ok(height)
183}
184
185pub fn save_stage_checkpoints<P>(
191 provider: P,
192 from: BlockNumber,
193 to: BlockNumber,
194 processed: u64,
195 total: u64,
196) -> ProviderResult<()>
197where
198 P: StageCheckpointWriter,
199{
200 provider.save_stage_checkpoint(
201 StageId::Headers,
202 StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
203 block_range: CheckpointBlockRange { from, to },
204 progress: EntitiesCheckpoint { processed, total },
205 }),
206 )?;
207 provider.save_stage_checkpoint(
208 StageId::Bodies,
209 StageCheckpoint::new(to)
210 .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
211 )?;
212 Ok(())
213}
214
215pub fn process<S, P, B, BB, BH>(
218 meta: &(impl EraMeta + ?Sized),
219 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
220 provider: &P,
221 hash_collector: &mut Collector<BlockHash, BlockNumber>,
222 block_numbers: impl RangeBounds<BlockNumber>,
223) -> eyre::Result<BlockNumber>
224where
225 S: EraBlockReader<BH, BB>,
226 B: Block<Header = BH, Body = BB>,
227 BH: FullBlockHeader + Value,
228 BB: FullBlockBody<
229 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
230 OmmerHeader = BH,
231 >,
232 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
233 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
234{
235 let iter = S::blocks(meta)?
236 .map(Some)
237 .chain(std::iter::once_with(|| match meta.mark_as_processed() {
238 Ok(()) => None,
239 Err(error) => Some(Err(error)),
240 }))
241 .flatten();
242
243 process_iter(iter, writer, provider, hash_collector, block_numbers)
244}
245
246pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
248where
249 BH: FullBlockHeader + Value,
250 BB: FullBlockBody<OmmerHeader = BH>,
251 E: From<E2sError> + Error + Send + Sync + 'static,
252{
253 let block = block?;
254 let header: BH = block.header.decode()?;
255 let body: BB = block.body.decode()?;
256
257 Ok((header, body))
258}
259
260pub fn process_iter<P, B, BB, BH>(
272 mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
273 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
274 provider: &P,
275 hash_collector: &mut Collector<BlockHash, BlockNumber>,
276 block_numbers: impl RangeBounds<BlockNumber>,
277) -> eyre::Result<BlockNumber>
278where
279 B: Block<Header = BH, Body = BB>,
280 BH: FullBlockHeader + Value,
281 BB: FullBlockBody<
282 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
283 OmmerHeader = BH,
284 >,
285 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
286 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
287{
288 let mut last_header_number = match block_numbers.start_bound() {
289 Bound::Included(&number) => number,
290 Bound::Excluded(&number) => number.saturating_add(1),
291 Bound::Unbounded => 0,
292 };
293 let target = match block_numbers.end_bound() {
294 Bound::Included(&number) => Some(number),
295 Bound::Excluded(&number) => Some(number.saturating_sub(1)),
296 Bound::Unbounded => None,
297 };
298
299 for block in &mut iter {
300 let (header, body) = block?;
301 let number = header.number();
302
303 if number <= last_header_number {
304 continue;
305 }
306 if let Some(target) = target &&
307 number > target
308 {
309 break;
310 }
311
312 let hash = header.hash_slow();
313 last_header_number = number;
314
315 writer.append_header(&header, &hash)?;
317
318 provider.append_block_bodies(vec![(header.number(), Some(&body))])?;
320
321 hash_collector.insert(hash, number)?;
322 }
323
324 Ok(last_header_number)
325}
326
327pub fn build_index<P>(
329 provider: &P,
330 hash_collector: &mut Collector<BlockHash, BlockNumber>,
331) -> eyre::Result<()>
332where
333 P: DBProvider<Tx: DbTxMut>,
334{
335 let total_headers = hash_collector.len();
336 info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
337
338 let mut cursor_header_numbers =
340 provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
341 let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
344 let Some((hash, block_number)) = cursor_header_numbers.last()? &&
345 block_number.value()? == 0
346 {
347 hash_collector.insert(hash.key()?, 0)?;
348 cursor_header_numbers.delete_current()?;
349 true
350 } else {
351 false
352 };
353
354 let interval = (total_headers / 10).max(8192);
355
356 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
358 let (hash, number) = hash_to_number?;
359
360 if index != 0 && index.is_multiple_of(interval) {
361 info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
362 }
363
364 let hash = RawKey::<BlockHash>::from_vec(hash);
365 let number = RawValue::<BlockNumber>::from_vec(number);
366
367 if first_sync {
368 cursor_header_numbers.append(hash, &number)?;
369 } else {
370 cursor_header_numbers.upsert(hash, &number)?;
371 }
372 }
373
374 Ok(())
375}
376
377pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
384where
385 P: BlockReader,
386{
387 let mut total_difficulty = U256::ZERO;
388 let mut start = 0;
389
390 while start <= num {
391 let end = (start + 1000 - 1).min(num);
392
393 total_difficulty +=
394 provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
395
396 start = end + 1;
397 }
398
399 Ok(total_difficulty)
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use alloy_consensus::Header;
406 use reth_db_common::init::init_genesis;
407 use reth_ethereum_primitives::{Block, BlockBody};
408 use reth_provider::{
409 test_utils::create_test_provider_factory, DatabaseProviderFactory,
410 StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
411 };
412 use std::{cell::Cell, path::Path};
413 use tempfile::tempdir;
414
415 struct TestEra;
416
417 impl EraBlockReader<Header, BlockBody> for TestEra {
418 fn blocks<M: EraMeta + ?Sized>(
419 _meta: &M,
420 ) -> eyre::Result<impl Iterator<Item = eyre::Result<(Header, BlockBody)>>> {
421 Ok([1, 2]
422 .into_iter()
423 .map(|number| Ok((Header { number, ..Default::default() }, BlockBody::default()))))
424 }
425 }
426
427 #[derive(Debug)]
428 struct TestMeta {
429 marked: Cell<bool>,
430 }
431
432 impl EraMeta for TestMeta {
433 fn mark_as_processed(&self) -> eyre::Result<()> {
434 self.marked.set(true);
435 Ok(())
436 }
437
438 fn path(&self) -> &Path {
439 Path::new("test.era1")
440 }
441 }
442
443 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
444 async fn import_stops_at_to_block() {
445 let pf = create_test_provider_factory();
446 init_genesis(&pf).unwrap();
447
448 let folder = tempdir().unwrap();
449 let mut hash_collector = Collector::new(4096, Some(folder.path().to_owned()));
450
451 let stream = futures_util::stream::iter(vec![
453 Ok(TestMeta { marked: Cell::new(false) }),
454 Ok(TestMeta { marked: Cell::new(false) }),
455 ]);
456
457 let height =
458 import::<TestEra, _, _, _, Block, _, _>(stream, &pf, &mut hash_collector, Some(1))
459 .unwrap();
460
461 assert_eq!(height, 1);
462 }
463
464 #[test]
465 fn process_does_not_mark_partially_consumed_file_processed() {
466 let pf = create_test_provider_factory();
467 init_genesis(&pf).unwrap();
468
469 let static_file_provider = pf.static_file_provider();
470 let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
471 let provider = pf.database_provider_rw().unwrap();
472 let folder = tempdir().unwrap();
473 let mut hash_collector = Collector::new(4096, Some(folder.path().to_owned()));
474 let meta = TestMeta { marked: Cell::new(false) };
475
476 let height = process::<TestEra, _, Block, _, _>(
477 &meta,
478 &mut writer,
479 &provider,
480 &mut hash_collector,
481 0..=1,
482 )
483 .unwrap();
484
485 assert_eq!(height, 1);
486 assert!(!meta.marked.get());
487 }
488}