reth_era_utils/
history.rs

1use alloy_primitives::{BlockHash, BlockNumber, U256};
2use futures_util::{Stream, StreamExt};
3use reth_db_api::{
4    cursor::{DbCursorRO, DbCursorRW},
5    table::Value,
6    tables,
7    transaction::{DbTx, DbTxMut},
8    RawKey, RawTable, RawValue,
9};
10use reth_era::{
11    e2s_types::E2sError,
12    era1_file::{BlockTupleIterator, Era1Reader},
13    era_file_ops::StreamReader,
14    execution_types::BlockTuple,
15    DecodeCompressed,
16};
17use reth_era_downloader::EraMeta;
18use reth_etl::Collector;
19use reth_fs_util as fs;
20use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
21use reth_provider::{
22    providers::StaticFileProviderRWRefMut, writer::UnifiedStorageWriter, BlockWriter,
23    ProviderError, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
24};
25use reth_stages_types::{
26    CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
27};
28use reth_storage_api::{
29    errors::ProviderResult, DBProvider, DatabaseProviderFactory, HeaderProvider,
30    NodePrimitivesProvider, StageCheckpointWriter, StorageLocation,
31};
32use std::{
33    collections::Bound,
34    error::Error,
35    fmt::{Display, Formatter},
36    io::{Read, Seek},
37    iter::Map,
38    ops::RangeBounds,
39    sync::mpsc,
40};
41use tracing::info;
42
43/// Imports blocks from `downloader` using `provider`.
44///
45/// Returns current block height.
46pub fn import<Downloader, Era, PF, B, BB, BH>(
47    mut downloader: Downloader,
48    provider_factory: &PF,
49    hash_collector: &mut Collector<BlockHash, BlockNumber>,
50) -> eyre::Result<BlockNumber>
51where
52    B: Block<Header = BH, Body = BB>,
53    BH: FullBlockHeader + Value,
54    BB: FullBlockBody<
55        Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
56        OmmerHeader = BH,
57    >,
58    Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
59    Era: EraMeta + Send + 'static,
60    PF: DatabaseProviderFactory<
61        ProviderRW: BlockWriter<Block = B>
62            + DBProvider
63            + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
64            + StageCheckpointWriter,
65    > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
66{
67    let (tx, rx) = mpsc::channel();
68
69    // Handle IO-bound async download in a background tokio task
70    tokio::spawn(async move {
71        while let Some(file) = downloader.next().await {
72            tx.send(Some(file))?;
73        }
74        tx.send(None)
75    });
76
77    let static_file_provider = provider_factory.static_file_provider();
78
79    // Consistency check of expected headers in static files vs DB is done on provider::sync_gap
80    // when poll_execute_ready is polled.
81    let mut height = static_file_provider
82        .get_highest_static_file_block(StaticFileSegment::Headers)
83        .unwrap_or_default();
84
85    // Find the latest total difficulty
86    let mut td = static_file_provider
87        .header_td_by_number(height)?
88        .ok_or(ProviderError::TotalDifficultyNotFound(height))?;
89
90    while let Some(meta) = rx.recv()? {
91        let from = height;
92        let provider = provider_factory.database_provider_rw()?;
93
94        height = process(
95            &meta?,
96            &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
97            &provider,
98            hash_collector,
99            &mut td,
100            height..,
101        )?;
102
103        save_stage_checkpoints(&provider, from, height, height, height)?;
104
105        UnifiedStorageWriter::commit(provider)?;
106    }
107
108    let provider = provider_factory.database_provider_rw()?;
109
110    build_index(&provider, hash_collector)?;
111
112    UnifiedStorageWriter::commit(provider)?;
113
114    Ok(height)
115}
116
117/// Saves progress of ERA import into stages sync.
118///
119/// Since the ERA import does the same work as `HeaderStage` and `BodyStage`, it needs to inform
120/// these stages that this work has already been done. Otherwise, there might be some conflict with
121/// database integrity.
122pub fn save_stage_checkpoints<P>(
123    provider: &P,
124    from: BlockNumber,
125    to: BlockNumber,
126    processed: u64,
127    total: u64,
128) -> ProviderResult<()>
129where
130    P: StageCheckpointWriter,
131{
132    provider.save_stage_checkpoint(
133        StageId::Headers,
134        StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
135            block_range: CheckpointBlockRange { from, to },
136            progress: EntitiesCheckpoint { processed, total },
137        }),
138    )?;
139    provider.save_stage_checkpoint(
140        StageId::Bodies,
141        StageCheckpoint::new(to)
142            .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
143    )?;
144    Ok(())
145}
146
147/// Extracts block headers and bodies from `meta` and appends them using `writer` and `provider`.
148///
149/// Adds on to `total_difficulty` and collects hash to height using `hash_collector`.
150///
151/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
152/// [`end_bound`] or the end of the file.
153///
154/// Returns last block height.
155///
156/// [`start_bound`]: RangeBounds::start_bound
157/// [`end_bound`]: RangeBounds::end_bound
158pub fn process<Era, P, B, BB, BH>(
159    meta: &Era,
160    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
161    provider: &P,
162    hash_collector: &mut Collector<BlockHash, BlockNumber>,
163    total_difficulty: &mut U256,
164    block_numbers: impl RangeBounds<BlockNumber>,
165) -> eyre::Result<BlockNumber>
166where
167    B: Block<Header = BH, Body = BB>,
168    BH: FullBlockHeader + Value,
169    BB: FullBlockBody<
170        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
171        OmmerHeader = BH,
172    >,
173    Era: EraMeta + ?Sized,
174    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
175    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
176{
177    let reader = open(meta)?;
178    let iter =
179        reader
180            .iter()
181            .map(Box::new(decode)
182                as Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>);
183    let iter = ProcessIter { iter, era: meta };
184
185    process_iter(iter, writer, provider, hash_collector, total_difficulty, block_numbers)
186}
187
188type ProcessInnerIter<R, BH, BB> =
189    Map<BlockTupleIterator<R>, Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>>;
190
191/// An iterator that wraps era file extraction. After the final item [`EraMeta::mark_as_processed`]
192/// is called to ensure proper cleanup.
193#[derive(Debug)]
194pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
195where
196    BH: FullBlockHeader + Value,
197    BB: FullBlockBody<OmmerHeader = BH>,
198{
199    iter: ProcessInnerIter<R, BH, BB>,
200    era: &'a Era,
201}
202
203impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
204where
205    BH: FullBlockHeader + Value,
206    BB: FullBlockBody<OmmerHeader = BH>,
207{
208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209        Display::fmt(&self.era.path().to_string_lossy(), f)
210    }
211}
212
213impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
214where
215    R: Read + Seek,
216    Era: EraMeta + ?Sized,
217    BH: FullBlockHeader + Value,
218    BB: FullBlockBody<OmmerHeader = BH>,
219{
220    type Item = eyre::Result<(BH, BB)>;
221
222    fn next(&mut self) -> Option<Self::Item> {
223        match self.iter.next() {
224            Some(item) => Some(item),
225            None => match self.era.mark_as_processed() {
226                Ok(..) => None,
227                Err(e) => Some(Err(e)),
228            },
229        }
230    }
231}
232
233/// Opens the era file described by `meta`.
234pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
235where
236    Era: EraMeta + ?Sized,
237{
238    let file = fs::open(meta.path())?;
239    let reader = Era1Reader::new(file);
240
241    Ok(reader)
242}
243
244/// Extracts a pair of [`FullBlockHeader`] and [`FullBlockBody`] from [`BlockTuple`].
245pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
246where
247    BH: FullBlockHeader + Value,
248    BB: FullBlockBody<OmmerHeader = BH>,
249    E: From<E2sError> + Error + Send + Sync + 'static,
250{
251    let block = block?;
252    let header: BH = block.header.decode()?;
253    let body: BB = block.body.decode()?;
254
255    Ok((header, body))
256}
257
258/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
259///
260/// Adds on to `total_difficulty` and collects hash to height using `hash_collector`.
261///
262/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
263/// [`end_bound`] or the end of the file.
264///
265/// Returns last block height.
266///
267/// [`start_bound`]: RangeBounds::start_bound
268/// [`end_bound`]: RangeBounds::end_bound
269pub fn process_iter<P, B, BB, BH>(
270    mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
271    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
272    provider: &P,
273    hash_collector: &mut Collector<BlockHash, BlockNumber>,
274    total_difficulty: &mut U256,
275    block_numbers: impl RangeBounds<BlockNumber>,
276) -> eyre::Result<BlockNumber>
277where
278    B: Block<Header = BH, Body = BB>,
279    BH: FullBlockHeader + Value,
280    BB: FullBlockBody<
281        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
282        OmmerHeader = BH,
283    >,
284    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
285    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
286{
287    let mut last_header_number = match block_numbers.start_bound() {
288        Bound::Included(&number) => number,
289        Bound::Excluded(&number) => number.saturating_sub(1),
290        Bound::Unbounded => 0,
291    };
292    let target = match block_numbers.end_bound() {
293        Bound::Included(&number) => Some(number),
294        Bound::Excluded(&number) => Some(number.saturating_add(1)),
295        Bound::Unbounded => None,
296    };
297
298    for block in &mut iter {
299        let (header, body) = block?;
300        let number = header.number();
301
302        if number <= last_header_number {
303            continue;
304        }
305        if let Some(target) = target {
306            if number > target {
307                break;
308            }
309        }
310
311        let hash = header.hash_slow();
312        last_header_number = number;
313
314        // Increase total difficulty
315        *total_difficulty += header.difficulty();
316
317        // Append to Headers segment
318        writer.append_header(&header, *total_difficulty, &hash)?;
319
320        // Write bodies to database.
321        provider.append_block_bodies(
322            vec![(header.number(), Some(body))],
323            // We are writing transactions directly to static files.
324            StorageLocation::StaticFiles,
325        )?;
326
327        hash_collector.insert(hash, number)?;
328    }
329
330    Ok(last_header_number)
331}
332
333/// Dumps the contents of `hash_collector` into [`tables::HeaderNumbers`].
334pub fn build_index<P, B, BB, BH>(
335    provider: &P,
336    hash_collector: &mut Collector<BlockHash, BlockNumber>,
337) -> eyre::Result<()>
338where
339    B: Block<Header = BH, Body = BB>,
340    BH: FullBlockHeader + Value,
341    BB: FullBlockBody<
342        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
343        OmmerHeader = BH,
344    >,
345    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
346    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
347{
348    let total_headers = hash_collector.len();
349    info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
350
351    // Database cursor for hash to number index
352    let mut cursor_header_numbers =
353        provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
354    let mut first_sync = false;
355
356    // If we only have the genesis block hash, then we are at first sync, and we can remove it,
357    // add it to the collector and use tx.append on all hashes.
358    if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
359        if let Some((hash, block_number)) = cursor_header_numbers.last()? {
360            if block_number.value()? == 0 {
361                hash_collector.insert(hash.key()?, 0)?;
362                cursor_header_numbers.delete_current()?;
363                first_sync = true;
364            }
365        }
366    }
367
368    let interval = (total_headers / 10).max(8192);
369
370    // Build block hash to block number index
371    for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
372        let (hash, number) = hash_to_number?;
373
374        if index != 0 && index.is_multiple_of(interval) {
375            info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
376        }
377
378        let hash = RawKey::<BlockHash>::from_vec(hash);
379        let number = RawValue::<BlockNumber>::from_vec(number);
380
381        if first_sync {
382            cursor_header_numbers.append(hash, &number)?;
383        } else {
384            cursor_header_numbers.upsert(hash, &number)?;
385        }
386    }
387
388    Ok(())
389}