1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 table::Value,
7 tables,
8 transaction::{DbTx, DbTxMut},
9 RawKey, RawTable, RawValue,
10};
11use reth_era::{
12 e2s_types::E2sError,
13 era1_file::{BlockTupleIterator, Era1Reader},
14 era_file_ops::StreamReader,
15 execution_types::BlockTuple,
16 DecodeCompressed,
17};
18use reth_era_downloader::EraMeta;
19use reth_etl::Collector;
20use reth_fs_util as fs;
21use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
22use reth_provider::{
23 providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
24 StaticFileSegment, StaticFileWriter,
25};
26use reth_stages_types::{
27 CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
28};
29use reth_storage_api::{
30 errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
31 StageCheckpointWriter,
32};
33use std::{
34 collections::Bound,
35 error::Error,
36 fmt::{Display, Formatter},
37 io::{Read, Seek},
38 iter::Map,
39 ops::RangeBounds,
40 sync::mpsc,
41};
42use tracing::info;
43
44pub fn import<Downloader, Era, PF, B, BB, BH>(
48 mut downloader: Downloader,
49 provider_factory: &PF,
50 hash_collector: &mut Collector<BlockHash, BlockNumber>,
51) -> eyre::Result<BlockNumber>
52where
53 B: Block<Header = BH, Body = BB>,
54 BH: FullBlockHeader + Value,
55 BB: FullBlockBody<
56 Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
57 OmmerHeader = BH,
58 >,
59 Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
60 Era: EraMeta + Send + 'static,
61 PF: DatabaseProviderFactory<
62 ProviderRW: BlockWriter<Block = B>
63 + DBProvider
64 + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
65 + StageCheckpointWriter,
66 > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
67{
68 let (tx, rx) = mpsc::channel();
69
70 tokio::spawn(async move {
72 while let Some(file) = downloader.next().await {
73 tx.send(Some(file))?;
74 }
75 tx.send(None)
76 });
77
78 let static_file_provider = provider_factory.static_file_provider();
79
80 let mut height = static_file_provider
83 .get_highest_static_file_block(StaticFileSegment::Headers)
84 .unwrap_or_default();
85
86 while let Some(meta) = rx.recv()? {
87 let from = height;
88 let provider = provider_factory.database_provider_rw()?;
89
90 height = process(
91 &meta?,
92 &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
93 &provider,
94 hash_collector,
95 height..,
96 )?;
97
98 save_stage_checkpoints(&provider, from, height, height, height)?;
99
100 provider.commit()?;
101 }
102
103 let provider = provider_factory.database_provider_rw()?;
104
105 build_index(&provider, hash_collector)?;
106
107 provider.commit()?;
108
109 Ok(height)
110}
111
112pub fn save_stage_checkpoints<P>(
118 provider: &P,
119 from: BlockNumber,
120 to: BlockNumber,
121 processed: u64,
122 total: u64,
123) -> ProviderResult<()>
124where
125 P: StageCheckpointWriter,
126{
127 provider.save_stage_checkpoint(
128 StageId::Headers,
129 StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
130 block_range: CheckpointBlockRange { from, to },
131 progress: EntitiesCheckpoint { processed, total },
132 }),
133 )?;
134 provider.save_stage_checkpoint(
135 StageId::Bodies,
136 StageCheckpoint::new(to)
137 .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
138 )?;
139 Ok(())
140}
141
142pub fn process<Era, P, B, BB, BH>(
154 meta: &Era,
155 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
156 provider: &P,
157 hash_collector: &mut Collector<BlockHash, BlockNumber>,
158 block_numbers: impl RangeBounds<BlockNumber>,
159) -> eyre::Result<BlockNumber>
160where
161 B: Block<Header = BH, Body = BB>,
162 BH: FullBlockHeader + Value,
163 BB: FullBlockBody<
164 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
165 OmmerHeader = BH,
166 >,
167 Era: EraMeta + ?Sized,
168 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
169 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
170{
171 let reader = open(meta)?;
172 let iter =
173 reader
174 .iter()
175 .map(Box::new(decode)
176 as Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>);
177 let iter = ProcessIter { iter, era: meta };
178
179 process_iter(iter, writer, provider, hash_collector, block_numbers)
180}
181
182type ProcessInnerIter<R, BH, BB> =
183 Map<BlockTupleIterator<R>, Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>>;
184
185#[derive(Debug)]
188pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
189where
190 BH: FullBlockHeader + Value,
191 BB: FullBlockBody<OmmerHeader = BH>,
192{
193 iter: ProcessInnerIter<R, BH, BB>,
194 era: &'a Era,
195}
196
197impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
198where
199 BH: FullBlockHeader + Value,
200 BB: FullBlockBody<OmmerHeader = BH>,
201{
202 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203 Display::fmt(&self.era.path().to_string_lossy(), f)
204 }
205}
206
207impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
208where
209 R: Read + Seek,
210 Era: EraMeta + ?Sized,
211 BH: FullBlockHeader + Value,
212 BB: FullBlockBody<OmmerHeader = BH>,
213{
214 type Item = eyre::Result<(BH, BB)>;
215
216 fn next(&mut self) -> Option<Self::Item> {
217 match self.iter.next() {
218 Some(item) => Some(item),
219 None => match self.era.mark_as_processed() {
220 Ok(..) => None,
221 Err(e) => Some(Err(e)),
222 },
223 }
224 }
225}
226
227pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
229where
230 Era: EraMeta + ?Sized,
231{
232 let file = fs::open(meta.path())?;
233 let reader = Era1Reader::new(file);
234
235 Ok(reader)
236}
237
238pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
240where
241 BH: FullBlockHeader + Value,
242 BB: FullBlockBody<OmmerHeader = BH>,
243 E: From<E2sError> + Error + Send + Sync + 'static,
244{
245 let block = block?;
246 let header: BH = block.header.decode()?;
247 let body: BB = block.body.decode()?;
248
249 Ok((header, body))
250}
251
252pub fn process_iter<P, B, BB, BH>(
264 mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
265 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
266 provider: &P,
267 hash_collector: &mut Collector<BlockHash, BlockNumber>,
268 block_numbers: impl RangeBounds<BlockNumber>,
269) -> eyre::Result<BlockNumber>
270where
271 B: Block<Header = BH, Body = BB>,
272 BH: FullBlockHeader + Value,
273 BB: FullBlockBody<
274 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
275 OmmerHeader = BH,
276 >,
277 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
278 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
279{
280 let mut last_header_number = match block_numbers.start_bound() {
281 Bound::Included(&number) => number,
282 Bound::Excluded(&number) => number.saturating_add(1),
283 Bound::Unbounded => 0,
284 };
285 let target = match block_numbers.end_bound() {
286 Bound::Included(&number) => Some(number),
287 Bound::Excluded(&number) => Some(number.saturating_sub(1)),
288 Bound::Unbounded => None,
289 };
290
291 for block in &mut iter {
292 let (header, body) = block?;
293 let number = header.number();
294
295 if number <= last_header_number {
296 continue;
297 }
298 if let Some(target) = target &&
299 number > target
300 {
301 break;
302 }
303
304 let hash = header.hash_slow();
305 last_header_number = number;
306
307 writer.append_header(&header, &hash)?;
309
310 provider.append_block_bodies(vec![(header.number(), Some(body))])?;
312
313 hash_collector.insert(hash, number)?;
314 }
315
316 Ok(last_header_number)
317}
318
319pub fn build_index<P, B, BB, BH>(
321 provider: &P,
322 hash_collector: &mut Collector<BlockHash, BlockNumber>,
323) -> eyre::Result<()>
324where
325 B: Block<Header = BH, Body = BB>,
326 BH: FullBlockHeader + Value,
327 BB: FullBlockBody<
328 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
329 OmmerHeader = BH,
330 >,
331 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
332 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
333{
334 let total_headers = hash_collector.len();
335 info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
336
337 let mut cursor_header_numbers =
339 provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
340 let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
343 let Some((hash, block_number)) = cursor_header_numbers.last()? &&
344 block_number.value()? == 0
345 {
346 hash_collector.insert(hash.key()?, 0)?;
347 cursor_header_numbers.delete_current()?;
348 true
349 } else {
350 false
351 };
352
353 let interval = (total_headers / 10).max(8192);
354
355 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
357 let (hash, number) = hash_to_number?;
358
359 if index != 0 && index.is_multiple_of(interval) {
360 info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
361 }
362
363 let hash = RawKey::<BlockHash>::from_vec(hash);
364 let number = RawValue::<BlockNumber>::from_vec(number);
365
366 if first_sync {
367 cursor_header_numbers.append(hash, &number)?;
368 } else {
369 cursor_header_numbers.upsert(hash, &number)?;
370 }
371 }
372
373 Ok(())
374}
375
376pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
383where
384 P: BlockReader,
385{
386 let mut total_difficulty = U256::ZERO;
387 let mut start = 0;
388
389 while start <= num {
390 let end = (start + 1000 - 1).min(num);
391
392 total_difficulty +=
393 provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
394
395 start = end + 1;
396 }
397
398 Ok(total_difficulty)
399}