1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 table::Value,
7 tables,
8 transaction::{DbTx, DbTxMut},
9 RawKey, RawTable, RawValue,
10};
11use reth_era::{
12 common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
13 e2s::error::E2sError,
14 era1::{
15 file::{BlockTupleIterator, Era1Reader},
16 types::execution::BlockTuple,
17 },
18};
19use reth_era_downloader::EraMeta;
20use reth_etl::Collector;
21use reth_fs_util as fs;
22use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
23use reth_provider::{
24 providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
25 StaticFileSegment, StaticFileWriter,
26};
27use reth_stages_types::{
28 CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
29};
30use reth_storage_api::{
31 errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
32 StageCheckpointWriter,
33};
34use std::{
35 collections::Bound,
36 error::Error,
37 fmt::{Display, Formatter},
38 io::{Read, Seek},
39 iter::Map,
40 ops::RangeBounds,
41 sync::mpsc,
42};
43use tracing::info;
44
45pub fn import<Downloader, Era, PF, B, BB, BH>(
49 mut downloader: Downloader,
50 provider_factory: &PF,
51 hash_collector: &mut Collector<BlockHash, BlockNumber>,
52) -> eyre::Result<BlockNumber>
53where
54 B: Block<Header = BH, Body = BB>,
55 BH: FullBlockHeader + Value,
56 BB: FullBlockBody<
57 Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
58 OmmerHeader = BH,
59 >,
60 Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
61 Era: EraMeta + Send + 'static,
62 PF: DatabaseProviderFactory<
63 ProviderRW: BlockWriter<Block = B>
64 + DBProvider
65 + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
66 + StageCheckpointWriter,
67 > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
68{
69 let (tx, rx) = mpsc::channel();
70
71 tokio::spawn(async move {
73 while let Some(file) = downloader.next().await {
74 tx.send(Some(file))?;
75 }
76 tx.send(None)
77 });
78
79 let static_file_provider = provider_factory.static_file_provider();
80
81 let mut height = static_file_provider
84 .get_highest_static_file_block(StaticFileSegment::Headers)
85 .unwrap_or_default();
86
87 while let Some(meta) = rx.recv()? {
88 let from = height;
89 let provider = provider_factory.database_provider_rw()?;
90
91 height = process(
92 &meta?,
93 &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
94 &provider,
95 hash_collector,
96 height..,
97 )?;
98
99 save_stage_checkpoints(&provider, from, height, height, height)?;
100
101 provider.commit()?;
102 }
103
104 let provider = provider_factory.database_provider_rw()?;
105
106 build_index(&provider, hash_collector)?;
107
108 provider.commit()?;
109
110 Ok(height)
111}
112
113pub fn save_stage_checkpoints<P>(
119 provider: P,
120 from: BlockNumber,
121 to: BlockNumber,
122 processed: u64,
123 total: u64,
124) -> ProviderResult<()>
125where
126 P: StageCheckpointWriter,
127{
128 provider.save_stage_checkpoint(
129 StageId::Headers,
130 StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
131 block_range: CheckpointBlockRange { from, to },
132 progress: EntitiesCheckpoint { processed, total },
133 }),
134 )?;
135 provider.save_stage_checkpoint(
136 StageId::Bodies,
137 StageCheckpoint::new(to)
138 .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
139 )?;
140 Ok(())
141}
142
143pub fn process<Era, P, B, BB, BH>(
155 meta: &Era,
156 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
157 provider: &P,
158 hash_collector: &mut Collector<BlockHash, BlockNumber>,
159 block_numbers: impl RangeBounds<BlockNumber>,
160) -> eyre::Result<BlockNumber>
161where
162 B: Block<Header = BH, Body = BB>,
163 BH: FullBlockHeader + Value,
164 BB: FullBlockBody<
165 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
166 OmmerHeader = BH,
167 >,
168 Era: EraMeta + ?Sized,
169 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
170 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
171{
172 let reader = open(meta)?;
173 let iter = reader.iter().map(decode as fn(_) -> _);
174 let iter = ProcessIter { iter, era: meta };
175
176 process_iter(iter, writer, provider, hash_collector, block_numbers)
177}
178
179type ProcessInnerIter<R, BH, BB> =
180 Map<BlockTupleIterator<R>, fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>;
181
182#[derive(Debug)]
185pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
186where
187 BH: FullBlockHeader + Value,
188 BB: FullBlockBody<OmmerHeader = BH>,
189{
190 iter: ProcessInnerIter<R, BH, BB>,
191 era: &'a Era,
192}
193
194impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
195where
196 BH: FullBlockHeader + Value,
197 BB: FullBlockBody<OmmerHeader = BH>,
198{
199 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200 Display::fmt(&self.era.path().to_string_lossy(), f)
201 }
202}
203
204impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
205where
206 R: Read + Seek,
207 Era: EraMeta + ?Sized,
208 BH: FullBlockHeader + Value,
209 BB: FullBlockBody<OmmerHeader = BH>,
210{
211 type Item = eyre::Result<(BH, BB)>;
212
213 fn next(&mut self) -> Option<Self::Item> {
214 match self.iter.next() {
215 Some(item) => Some(item),
216 None => match self.era.mark_as_processed() {
217 Ok(..) => None,
218 Err(e) => Some(Err(e)),
219 },
220 }
221 }
222}
223
224pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
226where
227 Era: EraMeta + ?Sized,
228{
229 let file = fs::open(meta.path())?;
230 let reader = Era1Reader::new(file);
231
232 Ok(reader)
233}
234
235pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
237where
238 BH: FullBlockHeader + Value,
239 BB: FullBlockBody<OmmerHeader = BH>,
240 E: From<E2sError> + Error + Send + Sync + 'static,
241{
242 let block = block?;
243 let header: BH = block.header.decode()?;
244 let body: BB = block.body.decode()?;
245
246 Ok((header, body))
247}
248
249pub fn process_iter<P, B, BB, BH>(
261 mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
262 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
263 provider: &P,
264 hash_collector: &mut Collector<BlockHash, BlockNumber>,
265 block_numbers: impl RangeBounds<BlockNumber>,
266) -> eyre::Result<BlockNumber>
267where
268 B: Block<Header = BH, Body = BB>,
269 BH: FullBlockHeader + Value,
270 BB: FullBlockBody<
271 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
272 OmmerHeader = BH,
273 >,
274 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
275 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
276{
277 let mut last_header_number = match block_numbers.start_bound() {
278 Bound::Included(&number) => number,
279 Bound::Excluded(&number) => number.saturating_add(1),
280 Bound::Unbounded => 0,
281 };
282 let target = match block_numbers.end_bound() {
283 Bound::Included(&number) => Some(number),
284 Bound::Excluded(&number) => Some(number.saturating_sub(1)),
285 Bound::Unbounded => None,
286 };
287
288 for block in &mut iter {
289 let (header, body) = block?;
290 let number = header.number();
291
292 if number <= last_header_number {
293 continue;
294 }
295 if let Some(target) = target &&
296 number > target
297 {
298 break;
299 }
300
301 let hash = header.hash_slow();
302 last_header_number = number;
303
304 writer.append_header(&header, &hash)?;
306
307 provider.append_block_bodies(vec![(header.number(), Some(&body))])?;
309
310 hash_collector.insert(hash, number)?;
311 }
312
313 Ok(last_header_number)
314}
315
316pub fn build_index<P, B, BB, BH>(
318 provider: &P,
319 hash_collector: &mut Collector<BlockHash, BlockNumber>,
320) -> eyre::Result<()>
321where
322 B: Block<Header = BH, Body = BB>,
323 BH: FullBlockHeader + Value,
324 BB: FullBlockBody<
325 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
326 OmmerHeader = BH,
327 >,
328 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
329 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
330{
331 let total_headers = hash_collector.len();
332 info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
333
334 let mut cursor_header_numbers =
336 provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
337 let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
340 let Some((hash, block_number)) = cursor_header_numbers.last()? &&
341 block_number.value()? == 0
342 {
343 hash_collector.insert(hash.key()?, 0)?;
344 cursor_header_numbers.delete_current()?;
345 true
346 } else {
347 false
348 };
349
350 let interval = (total_headers / 10).max(8192);
351
352 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
354 let (hash, number) = hash_to_number?;
355
356 if index != 0 && index.is_multiple_of(interval) {
357 info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
358 }
359
360 let hash = RawKey::<BlockHash>::from_vec(hash);
361 let number = RawValue::<BlockNumber>::from_vec(number);
362
363 if first_sync {
364 cursor_header_numbers.append(hash, &number)?;
365 } else {
366 cursor_header_numbers.upsert(hash, &number)?;
367 }
368 }
369
370 Ok(())
371}
372
373pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
380where
381 P: BlockReader,
382{
383 let mut total_difficulty = U256::ZERO;
384 let mut start = 0;
385
386 while start <= num {
387 let end = (start + 1000 - 1).min(num);
388
389 total_difficulty +=
390 provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
391
392 start = end + 1;
393 }
394
395 Ok(total_difficulty)
396}