1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 table::Value,
7 tables,
8 transaction::{DbTx, DbTxMut},
9 RawKey, RawTable, RawValue,
10};
11use reth_era::{
12 common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
13 e2s::error::E2sError,
14 era1::{
15 file::{BlockTupleIterator, Era1Reader},
16 types::execution::BlockTuple,
17 },
18};
19use reth_era_downloader::EraMeta;
20use reth_etl::Collector;
21use reth_fs_util as fs;
22use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
23use reth_provider::{
24 providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
25 StaticFileSegment, StaticFileWriter,
26};
27use reth_stages_types::{
28 CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
29};
30use reth_storage_api::{
31 errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
32 StageCheckpointWriter,
33};
34use std::{
35 collections::Bound,
36 error::Error,
37 fmt::{Display, Formatter},
38 io::{Read, Seek},
39 iter::Map,
40 ops::RangeBounds,
41 sync::mpsc,
42};
43use tracing::info;
44
45pub fn import<Downloader, Era, PF, B, BB, BH>(
49 mut downloader: Downloader,
50 provider_factory: &PF,
51 hash_collector: &mut Collector<BlockHash, BlockNumber>,
52) -> eyre::Result<BlockNumber>
53where
54 B: Block<Header = BH, Body = BB>,
55 BH: FullBlockHeader + Value,
56 BB: FullBlockBody<
57 Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
58 OmmerHeader = BH,
59 >,
60 Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
61 Era: EraMeta + Send + 'static,
62 PF: DatabaseProviderFactory<
63 ProviderRW: BlockWriter<Block = B>
64 + DBProvider
65 + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
66 + StageCheckpointWriter,
67 > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
68{
69 let (tx, rx) = mpsc::channel();
70
71 tokio::spawn(async move {
73 while let Some(file) = downloader.next().await {
74 tx.send(Some(file))?;
75 }
76 tx.send(None)
77 });
78
79 let static_file_provider = provider_factory.static_file_provider();
80
81 let mut height = static_file_provider
84 .get_highest_static_file_block(StaticFileSegment::Headers)
85 .unwrap_or_default();
86
87 while let Some(meta) = rx.recv()? {
88 let from = height;
89 let provider = provider_factory.database_provider_rw()?;
90
91 height = process(
92 &meta?,
93 &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
94 &provider,
95 hash_collector,
96 height..,
97 )?;
98
99 save_stage_checkpoints(&provider, from, height, height, height)?;
100
101 provider.commit()?;
102 }
103
104 let provider = provider_factory.database_provider_rw()?;
105
106 build_index(&provider, hash_collector)?;
107
108 provider.commit()?;
109
110 Ok(height)
111}
112
113pub fn save_stage_checkpoints<P>(
119 provider: &P,
120 from: BlockNumber,
121 to: BlockNumber,
122 processed: u64,
123 total: u64,
124) -> ProviderResult<()>
125where
126 P: StageCheckpointWriter,
127{
128 provider.save_stage_checkpoint(
129 StageId::Headers,
130 StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
131 block_range: CheckpointBlockRange { from, to },
132 progress: EntitiesCheckpoint { processed, total },
133 }),
134 )?;
135 provider.save_stage_checkpoint(
136 StageId::Bodies,
137 StageCheckpoint::new(to)
138 .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
139 )?;
140 Ok(())
141}
142
143pub fn process<Era, P, B, BB, BH>(
155 meta: &Era,
156 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
157 provider: &P,
158 hash_collector: &mut Collector<BlockHash, BlockNumber>,
159 block_numbers: impl RangeBounds<BlockNumber>,
160) -> eyre::Result<BlockNumber>
161where
162 B: Block<Header = BH, Body = BB>,
163 BH: FullBlockHeader + Value,
164 BB: FullBlockBody<
165 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
166 OmmerHeader = BH,
167 >,
168 Era: EraMeta + ?Sized,
169 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
170 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
171{
172 let reader = open(meta)?;
173 let iter =
174 reader
175 .iter()
176 .map(Box::new(decode)
177 as Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>);
178 let iter = ProcessIter { iter, era: meta };
179
180 process_iter(iter, writer, provider, hash_collector, block_numbers)
181}
182
183type ProcessInnerIter<R, BH, BB> =
184 Map<BlockTupleIterator<R>, Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>>;
185
186#[derive(Debug)]
189pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
190where
191 BH: FullBlockHeader + Value,
192 BB: FullBlockBody<OmmerHeader = BH>,
193{
194 iter: ProcessInnerIter<R, BH, BB>,
195 era: &'a Era,
196}
197
198impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
199where
200 BH: FullBlockHeader + Value,
201 BB: FullBlockBody<OmmerHeader = BH>,
202{
203 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
204 Display::fmt(&self.era.path().to_string_lossy(), f)
205 }
206}
207
208impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
209where
210 R: Read + Seek,
211 Era: EraMeta + ?Sized,
212 BH: FullBlockHeader + Value,
213 BB: FullBlockBody<OmmerHeader = BH>,
214{
215 type Item = eyre::Result<(BH, BB)>;
216
217 fn next(&mut self) -> Option<Self::Item> {
218 match self.iter.next() {
219 Some(item) => Some(item),
220 None => match self.era.mark_as_processed() {
221 Ok(..) => None,
222 Err(e) => Some(Err(e)),
223 },
224 }
225 }
226}
227
228pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
230where
231 Era: EraMeta + ?Sized,
232{
233 let file = fs::open(meta.path())?;
234 let reader = Era1Reader::new(file);
235
236 Ok(reader)
237}
238
239pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
241where
242 BH: FullBlockHeader + Value,
243 BB: FullBlockBody<OmmerHeader = BH>,
244 E: From<E2sError> + Error + Send + Sync + 'static,
245{
246 let block = block?;
247 let header: BH = block.header.decode()?;
248 let body: BB = block.body.decode()?;
249
250 Ok((header, body))
251}
252
253pub fn process_iter<P, B, BB, BH>(
265 mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
266 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
267 provider: &P,
268 hash_collector: &mut Collector<BlockHash, BlockNumber>,
269 block_numbers: impl RangeBounds<BlockNumber>,
270) -> eyre::Result<BlockNumber>
271where
272 B: Block<Header = BH, Body = BB>,
273 BH: FullBlockHeader + Value,
274 BB: FullBlockBody<
275 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
276 OmmerHeader = BH,
277 >,
278 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
279 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
280{
281 let mut last_header_number = match block_numbers.start_bound() {
282 Bound::Included(&number) => number,
283 Bound::Excluded(&number) => number.saturating_add(1),
284 Bound::Unbounded => 0,
285 };
286 let target = match block_numbers.end_bound() {
287 Bound::Included(&number) => Some(number),
288 Bound::Excluded(&number) => Some(number.saturating_sub(1)),
289 Bound::Unbounded => None,
290 };
291
292 for block in &mut iter {
293 let (header, body) = block?;
294 let number = header.number();
295
296 if number <= last_header_number {
297 continue;
298 }
299 if let Some(target) = target &&
300 number > target
301 {
302 break;
303 }
304
305 let hash = header.hash_slow();
306 last_header_number = number;
307
308 writer.append_header(&header, &hash)?;
310
311 provider.append_block_bodies(vec![(header.number(), Some(body))])?;
313
314 hash_collector.insert(hash, number)?;
315 }
316
317 Ok(last_header_number)
318}
319
320pub fn build_index<P, B, BB, BH>(
322 provider: &P,
323 hash_collector: &mut Collector<BlockHash, BlockNumber>,
324) -> eyre::Result<()>
325where
326 B: Block<Header = BH, Body = BB>,
327 BH: FullBlockHeader + Value,
328 BB: FullBlockBody<
329 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
330 OmmerHeader = BH,
331 >,
332 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
333 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
334{
335 let total_headers = hash_collector.len();
336 info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
337
338 let mut cursor_header_numbers =
340 provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
341 let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
344 let Some((hash, block_number)) = cursor_header_numbers.last()? &&
345 block_number.value()? == 0
346 {
347 hash_collector.insert(hash.key()?, 0)?;
348 cursor_header_numbers.delete_current()?;
349 true
350 } else {
351 false
352 };
353
354 let interval = (total_headers / 10).max(8192);
355
356 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
358 let (hash, number) = hash_to_number?;
359
360 if index != 0 && index.is_multiple_of(interval) {
361 info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
362 }
363
364 let hash = RawKey::<BlockHash>::from_vec(hash);
365 let number = RawValue::<BlockNumber>::from_vec(number);
366
367 if first_sync {
368 cursor_header_numbers.append(hash, &number)?;
369 } else {
370 cursor_header_numbers.upsert(hash, &number)?;
371 }
372 }
373
374 Ok(())
375}
376
377pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
384where
385 P: BlockReader,
386{
387 let mut total_difficulty = U256::ZERO;
388 let mut start = 0;
389
390 while start <= num {
391 let end = (start + 1000 - 1).min(num);
392
393 total_difficulty +=
394 provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
395
396 start = end + 1;
397 }
398
399 Ok(total_difficulty)
400}