reth_era_utils/
history.rs

1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    table::Value,
7    tables,
8    transaction::{DbTx, DbTxMut},
9    RawKey, RawTable, RawValue,
10};
11use reth_era::{
12    e2s_types::E2sError,
13    era1_file::{BlockTupleIterator, Era1Reader},
14    era_file_ops::StreamReader,
15    execution_types::BlockTuple,
16    DecodeCompressed,
17};
18use reth_era_downloader::EraMeta;
19use reth_etl::Collector;
20use reth_fs_util as fs;
21use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
22use reth_provider::{
23    providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
24    StaticFileSegment, StaticFileWriter,
25};
26use reth_stages_types::{
27    CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
28};
29use reth_storage_api::{
30    errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
31    StageCheckpointWriter,
32};
33use std::{
34    collections::Bound,
35    error::Error,
36    fmt::{Display, Formatter},
37    io::{Read, Seek},
38    iter::Map,
39    ops::RangeBounds,
40    sync::mpsc,
41};
42use tracing::info;
43
44/// Imports blocks from `downloader` using `provider`.
45///
46/// Returns current block height.
47pub fn import<Downloader, Era, PF, B, BB, BH>(
48    mut downloader: Downloader,
49    provider_factory: &PF,
50    hash_collector: &mut Collector<BlockHash, BlockNumber>,
51) -> eyre::Result<BlockNumber>
52where
53    B: Block<Header = BH, Body = BB>,
54    BH: FullBlockHeader + Value,
55    BB: FullBlockBody<
56        Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
57        OmmerHeader = BH,
58    >,
59    Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
60    Era: EraMeta + Send + 'static,
61    PF: DatabaseProviderFactory<
62        ProviderRW: BlockWriter<Block = B>
63            + DBProvider
64            + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
65            + StageCheckpointWriter,
66    > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
67{
68    let (tx, rx) = mpsc::channel();
69
70    // Handle IO-bound async download in a background tokio task
71    tokio::spawn(async move {
72        while let Some(file) = downloader.next().await {
73            tx.send(Some(file))?;
74        }
75        tx.send(None)
76    });
77
78    let static_file_provider = provider_factory.static_file_provider();
79
80    // Consistency check of expected headers in static files vs DB is done on provider::sync_gap
81    // when poll_execute_ready is polled.
82    let mut height = static_file_provider
83        .get_highest_static_file_block(StaticFileSegment::Headers)
84        .unwrap_or_default();
85
86    while let Some(meta) = rx.recv()? {
87        let from = height;
88        let provider = provider_factory.database_provider_rw()?;
89
90        height = process(
91            &meta?,
92            &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
93            &provider,
94            hash_collector,
95            height..,
96        )?;
97
98        save_stage_checkpoints(&provider, from, height, height, height)?;
99
100        provider.commit()?;
101    }
102
103    let provider = provider_factory.database_provider_rw()?;
104
105    build_index(&provider, hash_collector)?;
106
107    provider.commit()?;
108
109    Ok(height)
110}
111
112/// Saves progress of ERA import into stages sync.
113///
114/// Since the ERA import does the same work as `HeaderStage` and `BodyStage`, it needs to inform
115/// these stages that this work has already been done. Otherwise, there might be some conflict with
116/// database integrity.
117pub fn save_stage_checkpoints<P>(
118    provider: &P,
119    from: BlockNumber,
120    to: BlockNumber,
121    processed: u64,
122    total: u64,
123) -> ProviderResult<()>
124where
125    P: StageCheckpointWriter,
126{
127    provider.save_stage_checkpoint(
128        StageId::Headers,
129        StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
130            block_range: CheckpointBlockRange { from, to },
131            progress: EntitiesCheckpoint { processed, total },
132        }),
133    )?;
134    provider.save_stage_checkpoint(
135        StageId::Bodies,
136        StageCheckpoint::new(to)
137            .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
138    )?;
139    Ok(())
140}
141
142/// Extracts block headers and bodies from `meta` and appends them using `writer` and `provider`.
143///
144/// Collects hash to height using `hash_collector`.
145///
146/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
147/// [`end_bound`] or the end of the file.
148///
149/// Returns last block height.
150///
151/// [`start_bound`]: RangeBounds::start_bound
152/// [`end_bound`]: RangeBounds::end_bound
153pub fn process<Era, P, B, BB, BH>(
154    meta: &Era,
155    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
156    provider: &P,
157    hash_collector: &mut Collector<BlockHash, BlockNumber>,
158    block_numbers: impl RangeBounds<BlockNumber>,
159) -> eyre::Result<BlockNumber>
160where
161    B: Block<Header = BH, Body = BB>,
162    BH: FullBlockHeader + Value,
163    BB: FullBlockBody<
164        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
165        OmmerHeader = BH,
166    >,
167    Era: EraMeta + ?Sized,
168    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
169    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
170{
171    let reader = open(meta)?;
172    let iter =
173        reader
174            .iter()
175            .map(Box::new(decode)
176                as Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>);
177    let iter = ProcessIter { iter, era: meta };
178
179    process_iter(iter, writer, provider, hash_collector, block_numbers)
180}
181
182type ProcessInnerIter<R, BH, BB> =
183    Map<BlockTupleIterator<R>, Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>>;
184
185/// An iterator that wraps era file extraction. After the final item [`EraMeta::mark_as_processed`]
186/// is called to ensure proper cleanup.
187#[derive(Debug)]
188pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
189where
190    BH: FullBlockHeader + Value,
191    BB: FullBlockBody<OmmerHeader = BH>,
192{
193    iter: ProcessInnerIter<R, BH, BB>,
194    era: &'a Era,
195}
196
197impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
198where
199    BH: FullBlockHeader + Value,
200    BB: FullBlockBody<OmmerHeader = BH>,
201{
202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203        Display::fmt(&self.era.path().to_string_lossy(), f)
204    }
205}
206
207impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
208where
209    R: Read + Seek,
210    Era: EraMeta + ?Sized,
211    BH: FullBlockHeader + Value,
212    BB: FullBlockBody<OmmerHeader = BH>,
213{
214    type Item = eyre::Result<(BH, BB)>;
215
216    fn next(&mut self) -> Option<Self::Item> {
217        match self.iter.next() {
218            Some(item) => Some(item),
219            None => match self.era.mark_as_processed() {
220                Ok(..) => None,
221                Err(e) => Some(Err(e)),
222            },
223        }
224    }
225}
226
227/// Opens the era file described by `meta`.
228pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
229where
230    Era: EraMeta + ?Sized,
231{
232    let file = fs::open(meta.path())?;
233    let reader = Era1Reader::new(file);
234
235    Ok(reader)
236}
237
238/// Extracts a pair of [`FullBlockHeader`] and [`FullBlockBody`] from [`BlockTuple`].
239pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
240where
241    BH: FullBlockHeader + Value,
242    BB: FullBlockBody<OmmerHeader = BH>,
243    E: From<E2sError> + Error + Send + Sync + 'static,
244{
245    let block = block?;
246    let header: BH = block.header.decode()?;
247    let body: BB = block.body.decode()?;
248
249    Ok((header, body))
250}
251
252/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
253///
254/// Adds on to `total_difficulty` and collects hash to height using `hash_collector`.
255///
256/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
257/// [`end_bound`] or the end of the file.
258///
259/// Returns last block height.
260///
261/// [`start_bound`]: RangeBounds::start_bound
262/// [`end_bound`]: RangeBounds::end_bound
263pub fn process_iter<P, B, BB, BH>(
264    mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
265    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
266    provider: &P,
267    hash_collector: &mut Collector<BlockHash, BlockNumber>,
268    block_numbers: impl RangeBounds<BlockNumber>,
269) -> eyre::Result<BlockNumber>
270where
271    B: Block<Header = BH, Body = BB>,
272    BH: FullBlockHeader + Value,
273    BB: FullBlockBody<
274        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
275        OmmerHeader = BH,
276    >,
277    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
278    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
279{
280    let mut last_header_number = match block_numbers.start_bound() {
281        Bound::Included(&number) => number,
282        Bound::Excluded(&number) => number.saturating_add(1),
283        Bound::Unbounded => 0,
284    };
285    let target = match block_numbers.end_bound() {
286        Bound::Included(&number) => Some(number),
287        Bound::Excluded(&number) => Some(number.saturating_sub(1)),
288        Bound::Unbounded => None,
289    };
290
291    for block in &mut iter {
292        let (header, body) = block?;
293        let number = header.number();
294
295        if number <= last_header_number {
296            continue;
297        }
298        if let Some(target) = target &&
299            number > target
300        {
301            break;
302        }
303
304        let hash = header.hash_slow();
305        last_header_number = number;
306
307        // Append to Headers segment
308        writer.append_header(&header, &hash)?;
309
310        // Write bodies to database.
311        provider.append_block_bodies(vec![(header.number(), Some(body))])?;
312
313        hash_collector.insert(hash, number)?;
314    }
315
316    Ok(last_header_number)
317}
318
319/// Dumps the contents of `hash_collector` into [`tables::HeaderNumbers`].
320pub fn build_index<P, B, BB, BH>(
321    provider: &P,
322    hash_collector: &mut Collector<BlockHash, BlockNumber>,
323) -> eyre::Result<()>
324where
325    B: Block<Header = BH, Body = BB>,
326    BH: FullBlockHeader + Value,
327    BB: FullBlockBody<
328        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
329        OmmerHeader = BH,
330    >,
331    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
332    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
333{
334    let total_headers = hash_collector.len();
335    info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
336
337    // Database cursor for hash to number index
338    let mut cursor_header_numbers =
339        provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
340    // If we only have the genesis block hash, then we are at first sync, and we can remove it,
341    // add it to the collector and use tx.append on all hashes.
342    let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
343        let Some((hash, block_number)) = cursor_header_numbers.last()? &&
344        block_number.value()? == 0
345    {
346        hash_collector.insert(hash.key()?, 0)?;
347        cursor_header_numbers.delete_current()?;
348        true
349    } else {
350        false
351    };
352
353    let interval = (total_headers / 10).max(8192);
354
355    // Build block hash to block number index
356    for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
357        let (hash, number) = hash_to_number?;
358
359        if index != 0 && index.is_multiple_of(interval) {
360            info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
361        }
362
363        let hash = RawKey::<BlockHash>::from_vec(hash);
364        let number = RawValue::<BlockNumber>::from_vec(number);
365
366        if first_sync {
367            cursor_header_numbers.append(hash, &number)?;
368        } else {
369            cursor_header_numbers.upsert(hash, &number)?;
370        }
371    }
372
373    Ok(())
374}
375
376/// Calculates the total difficulty for a given block number by summing the difficulty
377/// of all blocks from genesis to the given block.
378///
379/// Very expensive - iterates through all blocks in batches of 1000.
380///
381/// Returns an error if any block is missing.
382pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
383where
384    P: BlockReader,
385{
386    let mut total_difficulty = U256::ZERO;
387    let mut start = 0;
388
389    while start <= num {
390        let end = (start + 1000 - 1).min(num);
391
392        total_difficulty +=
393            provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
394
395        start = end + 1;
396    }
397
398    Ok(total_difficulty)
399}