reth_era_utils/
history.rs

1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    table::Value,
7    tables,
8    transaction::{DbTx, DbTxMut},
9    RawKey, RawTable, RawValue,
10};
11use reth_era::{
12    common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
13    e2s::error::E2sError,
14    era1::{
15        file::{BlockTupleIterator, Era1Reader},
16        types::execution::BlockTuple,
17    },
18};
19use reth_era_downloader::EraMeta;
20use reth_etl::Collector;
21use reth_fs_util as fs;
22use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
23use reth_provider::{
24    providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
25    StaticFileSegment, StaticFileWriter,
26};
27use reth_stages_types::{
28    CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
29};
30use reth_storage_api::{
31    errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
32    StageCheckpointWriter,
33};
34use std::{
35    collections::Bound,
36    error::Error,
37    fmt::{Display, Formatter},
38    io::{Read, Seek},
39    iter::Map,
40    ops::RangeBounds,
41    sync::mpsc,
42};
43use tracing::info;
44
45/// Imports blocks from `downloader` using `provider`.
46///
47/// Returns current block height.
48pub fn import<Downloader, Era, PF, B, BB, BH>(
49    mut downloader: Downloader,
50    provider_factory: &PF,
51    hash_collector: &mut Collector<BlockHash, BlockNumber>,
52) -> eyre::Result<BlockNumber>
53where
54    B: Block<Header = BH, Body = BB>,
55    BH: FullBlockHeader + Value,
56    BB: FullBlockBody<
57        Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
58        OmmerHeader = BH,
59    >,
60    Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
61    Era: EraMeta + Send + 'static,
62    PF: DatabaseProviderFactory<
63        ProviderRW: BlockWriter<Block = B>
64            + DBProvider
65            + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
66            + StageCheckpointWriter,
67    > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
68{
69    let (tx, rx) = mpsc::channel();
70
71    // Handle IO-bound async download in a background tokio task
72    tokio::spawn(async move {
73        while let Some(file) = downloader.next().await {
74            tx.send(Some(file))?;
75        }
76        tx.send(None)
77    });
78
79    let static_file_provider = provider_factory.static_file_provider();
80
81    // Consistency check of expected headers in static files vs DB is done on provider::sync_gap
82    // when poll_execute_ready is polled.
83    let mut height = static_file_provider
84        .get_highest_static_file_block(StaticFileSegment::Headers)
85        .unwrap_or_default();
86
87    while let Some(meta) = rx.recv()? {
88        let from = height;
89        let provider = provider_factory.database_provider_rw()?;
90
91        height = process(
92            &meta?,
93            &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
94            &provider,
95            hash_collector,
96            height..,
97        )?;
98
99        save_stage_checkpoints(&provider, from, height, height, height)?;
100
101        provider.commit()?;
102    }
103
104    let provider = provider_factory.database_provider_rw()?;
105
106    build_index(&provider, hash_collector)?;
107
108    provider.commit()?;
109
110    Ok(height)
111}
112
113/// Saves progress of ERA import into stages sync.
114///
115/// Since the ERA import does the same work as `HeaderStage` and `BodyStage`, it needs to inform
116/// these stages that this work has already been done. Otherwise, there might be some conflict with
117/// database integrity.
118pub fn save_stage_checkpoints<P>(
119    provider: P,
120    from: BlockNumber,
121    to: BlockNumber,
122    processed: u64,
123    total: u64,
124) -> ProviderResult<()>
125where
126    P: StageCheckpointWriter,
127{
128    provider.save_stage_checkpoint(
129        StageId::Headers,
130        StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
131            block_range: CheckpointBlockRange { from, to },
132            progress: EntitiesCheckpoint { processed, total },
133        }),
134    )?;
135    provider.save_stage_checkpoint(
136        StageId::Bodies,
137        StageCheckpoint::new(to)
138            .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
139    )?;
140    Ok(())
141}
142
143/// Extracts block headers and bodies from `meta` and appends them using `writer` and `provider`.
144///
145/// Collects hash to height using `hash_collector`.
146///
147/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
148/// [`end_bound`] or the end of the file.
149///
150/// Returns last block height.
151///
152/// [`start_bound`]: RangeBounds::start_bound
153/// [`end_bound`]: RangeBounds::end_bound
154pub fn process<Era, P, B, BB, BH>(
155    meta: &Era,
156    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
157    provider: &P,
158    hash_collector: &mut Collector<BlockHash, BlockNumber>,
159    block_numbers: impl RangeBounds<BlockNumber>,
160) -> eyre::Result<BlockNumber>
161where
162    B: Block<Header = BH, Body = BB>,
163    BH: FullBlockHeader + Value,
164    BB: FullBlockBody<
165        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
166        OmmerHeader = BH,
167    >,
168    Era: EraMeta + ?Sized,
169    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
170    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
171{
172    let reader = open(meta)?;
173    let iter = reader.iter().map(decode as fn(_) -> _);
174    let iter = ProcessIter { iter, era: meta };
175
176    process_iter(iter, writer, provider, hash_collector, block_numbers)
177}
178
179type ProcessInnerIter<R, BH, BB> =
180    Map<BlockTupleIterator<R>, fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>;
181
182/// An iterator that wraps era file extraction. After the final item [`EraMeta::mark_as_processed`]
183/// is called to ensure proper cleanup.
184#[derive(Debug)]
185pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
186where
187    BH: FullBlockHeader + Value,
188    BB: FullBlockBody<OmmerHeader = BH>,
189{
190    iter: ProcessInnerIter<R, BH, BB>,
191    era: &'a Era,
192}
193
194impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
195where
196    BH: FullBlockHeader + Value,
197    BB: FullBlockBody<OmmerHeader = BH>,
198{
199    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200        Display::fmt(&self.era.path().to_string_lossy(), f)
201    }
202}
203
204impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
205where
206    R: Read + Seek,
207    Era: EraMeta + ?Sized,
208    BH: FullBlockHeader + Value,
209    BB: FullBlockBody<OmmerHeader = BH>,
210{
211    type Item = eyre::Result<(BH, BB)>;
212
213    fn next(&mut self) -> Option<Self::Item> {
214        match self.iter.next() {
215            Some(item) => Some(item),
216            None => match self.era.mark_as_processed() {
217                Ok(..) => None,
218                Err(e) => Some(Err(e)),
219            },
220        }
221    }
222}
223
224/// Opens the era file described by `meta`.
225pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
226where
227    Era: EraMeta + ?Sized,
228{
229    let file = fs::open(meta.path())?;
230    let reader = Era1Reader::new(file);
231
232    Ok(reader)
233}
234
235/// Extracts a pair of [`FullBlockHeader`] and [`FullBlockBody`] from [`BlockTuple`].
236pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
237where
238    BH: FullBlockHeader + Value,
239    BB: FullBlockBody<OmmerHeader = BH>,
240    E: From<E2sError> + Error + Send + Sync + 'static,
241{
242    let block = block?;
243    let header: BH = block.header.decode()?;
244    let body: BB = block.body.decode()?;
245
246    Ok((header, body))
247}
248
249/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
250///
251/// Collects hash to height using `hash_collector`.
252///
253/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
254/// [`end_bound`] or the end of the file.
255///
256/// Returns last block height.
257///
258/// [`start_bound`]: RangeBounds::start_bound
259/// [`end_bound`]: RangeBounds::end_bound
260pub fn process_iter<P, B, BB, BH>(
261    mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
262    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
263    provider: &P,
264    hash_collector: &mut Collector<BlockHash, BlockNumber>,
265    block_numbers: impl RangeBounds<BlockNumber>,
266) -> eyre::Result<BlockNumber>
267where
268    B: Block<Header = BH, Body = BB>,
269    BH: FullBlockHeader + Value,
270    BB: FullBlockBody<
271        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
272        OmmerHeader = BH,
273    >,
274    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
275    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
276{
277    let mut last_header_number = match block_numbers.start_bound() {
278        Bound::Included(&number) => number,
279        Bound::Excluded(&number) => number.saturating_add(1),
280        Bound::Unbounded => 0,
281    };
282    let target = match block_numbers.end_bound() {
283        Bound::Included(&number) => Some(number),
284        Bound::Excluded(&number) => Some(number.saturating_sub(1)),
285        Bound::Unbounded => None,
286    };
287
288    for block in &mut iter {
289        let (header, body) = block?;
290        let number = header.number();
291
292        if number <= last_header_number {
293            continue;
294        }
295        if let Some(target) = target &&
296            number > target
297        {
298            break;
299        }
300
301        let hash = header.hash_slow();
302        last_header_number = number;
303
304        // Append to Headers segment
305        writer.append_header(&header, &hash)?;
306
307        // Write bodies to database.
308        provider.append_block_bodies(vec![(header.number(), Some(&body))])?;
309
310        hash_collector.insert(hash, number)?;
311    }
312
313    Ok(last_header_number)
314}
315
316/// Dumps the contents of `hash_collector` into [`tables::HeaderNumbers`].
317pub fn build_index<P, B, BB, BH>(
318    provider: &P,
319    hash_collector: &mut Collector<BlockHash, BlockNumber>,
320) -> eyre::Result<()>
321where
322    B: Block<Header = BH, Body = BB>,
323    BH: FullBlockHeader + Value,
324    BB: FullBlockBody<
325        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
326        OmmerHeader = BH,
327    >,
328    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
329    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
330{
331    let total_headers = hash_collector.len();
332    info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
333
334    // Database cursor for hash to number index
335    let mut cursor_header_numbers =
336        provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
337    // If we only have the genesis block hash, then we are at first sync, and we can remove it,
338    // add it to the collector and use tx.append on all hashes.
339    let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
340        let Some((hash, block_number)) = cursor_header_numbers.last()? &&
341        block_number.value()? == 0
342    {
343        hash_collector.insert(hash.key()?, 0)?;
344        cursor_header_numbers.delete_current()?;
345        true
346    } else {
347        false
348    };
349
350    let interval = (total_headers / 10).max(8192);
351
352    // Build block hash to block number index
353    for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
354        let (hash, number) = hash_to_number?;
355
356        if index != 0 && index.is_multiple_of(interval) {
357            info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
358        }
359
360        let hash = RawKey::<BlockHash>::from_vec(hash);
361        let number = RawValue::<BlockNumber>::from_vec(number);
362
363        if first_sync {
364            cursor_header_numbers.append(hash, &number)?;
365        } else {
366            cursor_header_numbers.upsert(hash, &number)?;
367        }
368    }
369
370    Ok(())
371}
372
373/// Calculates the total difficulty for a given block number by summing the difficulty
374/// of all blocks from genesis to the given block.
375///
376/// Very expensive - iterates through all blocks in batches of 1000.
377///
378/// Returns an error if any block is missing.
379pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
380where
381    P: BlockReader,
382{
383    let mut total_difficulty = U256::ZERO;
384    let mut start = 0;
385
386    while start <= num {
387        let end = (start + 1000 - 1).min(num);
388
389        total_difficulty +=
390            provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
391
392        start = end + 1;
393    }
394
395    Ok(total_difficulty)
396}