1use alloy_primitives::{BlockHash, BlockNumber, U256};
2use futures_util::{Stream, StreamExt};
3use reth_db_api::{
4 cursor::{DbCursorRO, DbCursorRW},
5 table::Value,
6 tables,
7 transaction::{DbTx, DbTxMut},
8 RawKey, RawTable, RawValue,
9};
10use reth_era::{
11 e2s_types::E2sError,
12 era1_file::{BlockTupleIterator, Era1Reader},
13 era_file_ops::StreamReader,
14 execution_types::BlockTuple,
15 DecodeCompressed,
16};
17use reth_era_downloader::EraMeta;
18use reth_etl::Collector;
19use reth_fs_util as fs;
20use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
21use reth_provider::{
22 providers::StaticFileProviderRWRefMut, writer::UnifiedStorageWriter, BlockWriter,
23 ProviderError, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
24};
25use reth_stages_types::{
26 CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
27};
28use reth_storage_api::{
29 errors::ProviderResult, DBProvider, DatabaseProviderFactory, HeaderProvider,
30 NodePrimitivesProvider, StageCheckpointWriter, StorageLocation,
31};
32use std::{
33 collections::Bound,
34 error::Error,
35 fmt::{Display, Formatter},
36 io::{Read, Seek},
37 iter::Map,
38 ops::RangeBounds,
39 sync::mpsc,
40};
41use tracing::info;
42
43pub fn import<Downloader, Era, PF, B, BB, BH>(
47 mut downloader: Downloader,
48 provider_factory: &PF,
49 hash_collector: &mut Collector<BlockHash, BlockNumber>,
50) -> eyre::Result<BlockNumber>
51where
52 B: Block<Header = BH, Body = BB>,
53 BH: FullBlockHeader + Value,
54 BB: FullBlockBody<
55 Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
56 OmmerHeader = BH,
57 >,
58 Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
59 Era: EraMeta + Send + 'static,
60 PF: DatabaseProviderFactory<
61 ProviderRW: BlockWriter<Block = B>
62 + DBProvider
63 + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
64 + StageCheckpointWriter,
65 > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
66{
67 let (tx, rx) = mpsc::channel();
68
69 tokio::spawn(async move {
71 while let Some(file) = downloader.next().await {
72 tx.send(Some(file))?;
73 }
74 tx.send(None)
75 });
76
77 let static_file_provider = provider_factory.static_file_provider();
78
79 let mut height = static_file_provider
82 .get_highest_static_file_block(StaticFileSegment::Headers)
83 .unwrap_or_default();
84
85 let mut td = static_file_provider
87 .header_td_by_number(height)?
88 .ok_or(ProviderError::TotalDifficultyNotFound(height))?;
89
90 while let Some(meta) = rx.recv()? {
91 let from = height;
92 let provider = provider_factory.database_provider_rw()?;
93
94 height = process(
95 &meta?,
96 &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
97 &provider,
98 hash_collector,
99 &mut td,
100 height..,
101 )?;
102
103 save_stage_checkpoints(&provider, from, height, height, height)?;
104
105 UnifiedStorageWriter::commit(provider)?;
106 }
107
108 let provider = provider_factory.database_provider_rw()?;
109
110 build_index(&provider, hash_collector)?;
111
112 UnifiedStorageWriter::commit(provider)?;
113
114 Ok(height)
115}
116
117pub fn save_stage_checkpoints<P>(
123 provider: &P,
124 from: BlockNumber,
125 to: BlockNumber,
126 processed: u64,
127 total: u64,
128) -> ProviderResult<()>
129where
130 P: StageCheckpointWriter,
131{
132 provider.save_stage_checkpoint(
133 StageId::Headers,
134 StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
135 block_range: CheckpointBlockRange { from, to },
136 progress: EntitiesCheckpoint { processed, total },
137 }),
138 )?;
139 provider.save_stage_checkpoint(
140 StageId::Bodies,
141 StageCheckpoint::new(to)
142 .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
143 )?;
144 Ok(())
145}
146
147pub fn process<Era, P, B, BB, BH>(
159 meta: &Era,
160 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
161 provider: &P,
162 hash_collector: &mut Collector<BlockHash, BlockNumber>,
163 total_difficulty: &mut U256,
164 block_numbers: impl RangeBounds<BlockNumber>,
165) -> eyre::Result<BlockNumber>
166where
167 B: Block<Header = BH, Body = BB>,
168 BH: FullBlockHeader + Value,
169 BB: FullBlockBody<
170 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
171 OmmerHeader = BH,
172 >,
173 Era: EraMeta + ?Sized,
174 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
175 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
176{
177 let reader = open(meta)?;
178 let iter =
179 reader
180 .iter()
181 .map(Box::new(decode)
182 as Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>);
183 let iter = ProcessIter { iter, era: meta };
184
185 process_iter(iter, writer, provider, hash_collector, total_difficulty, block_numbers)
186}
187
188type ProcessInnerIter<R, BH, BB> =
189 Map<BlockTupleIterator<R>, Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>>;
190
191#[derive(Debug)]
194pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
195where
196 BH: FullBlockHeader + Value,
197 BB: FullBlockBody<OmmerHeader = BH>,
198{
199 iter: ProcessInnerIter<R, BH, BB>,
200 era: &'a Era,
201}
202
203impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
204where
205 BH: FullBlockHeader + Value,
206 BB: FullBlockBody<OmmerHeader = BH>,
207{
208 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209 Display::fmt(&self.era.path().to_string_lossy(), f)
210 }
211}
212
213impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
214where
215 R: Read + Seek,
216 Era: EraMeta + ?Sized,
217 BH: FullBlockHeader + Value,
218 BB: FullBlockBody<OmmerHeader = BH>,
219{
220 type Item = eyre::Result<(BH, BB)>;
221
222 fn next(&mut self) -> Option<Self::Item> {
223 match self.iter.next() {
224 Some(item) => Some(item),
225 None => match self.era.mark_as_processed() {
226 Ok(..) => None,
227 Err(e) => Some(Err(e)),
228 },
229 }
230 }
231}
232
233pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
235where
236 Era: EraMeta + ?Sized,
237{
238 let file = fs::open(meta.path())?;
239 let reader = Era1Reader::new(file);
240
241 Ok(reader)
242}
243
244pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
246where
247 BH: FullBlockHeader + Value,
248 BB: FullBlockBody<OmmerHeader = BH>,
249 E: From<E2sError> + Error + Send + Sync + 'static,
250{
251 let block = block?;
252 let header: BH = block.header.decode()?;
253 let body: BB = block.body.decode()?;
254
255 Ok((header, body))
256}
257
258pub fn process_iter<P, B, BB, BH>(
270 mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
271 writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
272 provider: &P,
273 hash_collector: &mut Collector<BlockHash, BlockNumber>,
274 total_difficulty: &mut U256,
275 block_numbers: impl RangeBounds<BlockNumber>,
276) -> eyre::Result<BlockNumber>
277where
278 B: Block<Header = BH, Body = BB>,
279 BH: FullBlockHeader + Value,
280 BB: FullBlockBody<
281 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
282 OmmerHeader = BH,
283 >,
284 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
285 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
286{
287 let mut last_header_number = match block_numbers.start_bound() {
288 Bound::Included(&number) => number,
289 Bound::Excluded(&number) => number.saturating_sub(1),
290 Bound::Unbounded => 0,
291 };
292 let target = match block_numbers.end_bound() {
293 Bound::Included(&number) => Some(number),
294 Bound::Excluded(&number) => Some(number.saturating_add(1)),
295 Bound::Unbounded => None,
296 };
297
298 for block in &mut iter {
299 let (header, body) = block?;
300 let number = header.number();
301
302 if number <= last_header_number {
303 continue;
304 }
305 if let Some(target) = target {
306 if number > target {
307 break;
308 }
309 }
310
311 let hash = header.hash_slow();
312 last_header_number = number;
313
314 *total_difficulty += header.difficulty();
316
317 writer.append_header(&header, *total_difficulty, &hash)?;
319
320 provider.append_block_bodies(
322 vec![(header.number(), Some(body))],
323 StorageLocation::StaticFiles,
325 )?;
326
327 hash_collector.insert(hash, number)?;
328 }
329
330 Ok(last_header_number)
331}
332
333pub fn build_index<P, B, BB, BH>(
335 provider: &P,
336 hash_collector: &mut Collector<BlockHash, BlockNumber>,
337) -> eyre::Result<()>
338where
339 B: Block<Header = BH, Body = BB>,
340 BH: FullBlockHeader + Value,
341 BB: FullBlockBody<
342 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
343 OmmerHeader = BH,
344 >,
345 P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
346 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
347{
348 let total_headers = hash_collector.len();
349 info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
350
351 let mut cursor_header_numbers =
353 provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
354 let mut first_sync = false;
355
356 if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
359 if let Some((hash, block_number)) = cursor_header_numbers.last()? {
360 if block_number.value()? == 0 {
361 hash_collector.insert(hash.key()?, 0)?;
362 cursor_header_numbers.delete_current()?;
363 first_sync = true;
364 }
365 }
366 }
367
368 let interval = (total_headers / 10).max(8192);
369
370 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
372 let (hash, number) = hash_to_number?;
373
374 if index != 0 && index.is_multiple_of(interval) {
375 info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
376 }
377
378 let hash = RawKey::<BlockHash>::from_vec(hash);
379 let number = RawValue::<BlockNumber>::from_vec(number);
380
381 if first_sync {
382 cursor_header_numbers.append(hash, &number)?;
383 } else {
384 cursor_header_numbers.upsert(hash, &number)?;
385 }
386 }
387
388 Ok(())
389}