reth_era_utils/
history.rs

1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    table::Value,
7    tables,
8    transaction::{DbTx, DbTxMut},
9    RawKey, RawTable, RawValue,
10};
11use reth_era::{
12    common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
13    e2s::error::E2sError,
14    era1::{
15        file::{BlockTupleIterator, Era1Reader},
16        types::execution::BlockTuple,
17    },
18};
19use reth_era_downloader::EraMeta;
20use reth_etl::Collector;
21use reth_fs_util as fs;
22use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
23use reth_provider::{
24    providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
25    StaticFileSegment, StaticFileWriter,
26};
27use reth_stages_types::{
28    CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
29};
30use reth_storage_api::{
31    errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
32    StageCheckpointWriter,
33};
34use std::{
35    collections::Bound,
36    error::Error,
37    fmt::{Display, Formatter},
38    io::{Read, Seek},
39    iter::Map,
40    ops::RangeBounds,
41    sync::mpsc,
42};
43use tracing::info;
44
45/// Imports blocks from `downloader` using `provider`.
46///
47/// Returns current block height.
48pub fn import<Downloader, Era, PF, B, BB, BH>(
49    mut downloader: Downloader,
50    provider_factory: &PF,
51    hash_collector: &mut Collector<BlockHash, BlockNumber>,
52) -> eyre::Result<BlockNumber>
53where
54    B: Block<Header = BH, Body = BB>,
55    BH: FullBlockHeader + Value,
56    BB: FullBlockBody<
57        Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
58        OmmerHeader = BH,
59    >,
60    Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
61    Era: EraMeta + Send + 'static,
62    PF: DatabaseProviderFactory<
63        ProviderRW: BlockWriter<Block = B>
64            + DBProvider
65            + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
66            + StageCheckpointWriter,
67    > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
68{
69    let (tx, rx) = mpsc::channel();
70
71    // Handle IO-bound async download in a background tokio task
72    tokio::spawn(async move {
73        while let Some(file) = downloader.next().await {
74            tx.send(Some(file))?;
75        }
76        tx.send(None)
77    });
78
79    let static_file_provider = provider_factory.static_file_provider();
80
81    // Consistency check of expected headers in static files vs DB is done on provider::sync_gap
82    // when poll_execute_ready is polled.
83    let mut height = static_file_provider
84        .get_highest_static_file_block(StaticFileSegment::Headers)
85        .unwrap_or_default();
86
87    while let Some(meta) = rx.recv()? {
88        let from = height;
89        let provider = provider_factory.database_provider_rw()?;
90
91        height = process(
92            &meta?,
93            &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
94            &provider,
95            hash_collector,
96            height..,
97        )?;
98
99        save_stage_checkpoints(&provider, from, height, height, height)?;
100
101        provider.commit()?;
102    }
103
104    let provider = provider_factory.database_provider_rw()?;
105
106    build_index(&provider, hash_collector)?;
107
108    provider.commit()?;
109
110    Ok(height)
111}
112
113/// Saves progress of ERA import into stages sync.
114///
115/// Since the ERA import does the same work as `HeaderStage` and `BodyStage`, it needs to inform
116/// these stages that this work has already been done. Otherwise, there might be some conflict with
117/// database integrity.
118pub fn save_stage_checkpoints<P>(
119    provider: &P,
120    from: BlockNumber,
121    to: BlockNumber,
122    processed: u64,
123    total: u64,
124) -> ProviderResult<()>
125where
126    P: StageCheckpointWriter,
127{
128    provider.save_stage_checkpoint(
129        StageId::Headers,
130        StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
131            block_range: CheckpointBlockRange { from, to },
132            progress: EntitiesCheckpoint { processed, total },
133        }),
134    )?;
135    provider.save_stage_checkpoint(
136        StageId::Bodies,
137        StageCheckpoint::new(to)
138            .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
139    )?;
140    Ok(())
141}
142
143/// Extracts block headers and bodies from `meta` and appends them using `writer` and `provider`.
144///
145/// Collects hash to height using `hash_collector`.
146///
147/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
148/// [`end_bound`] or the end of the file.
149///
150/// Returns last block height.
151///
152/// [`start_bound`]: RangeBounds::start_bound
153/// [`end_bound`]: RangeBounds::end_bound
154pub fn process<Era, P, B, BB, BH>(
155    meta: &Era,
156    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
157    provider: &P,
158    hash_collector: &mut Collector<BlockHash, BlockNumber>,
159    block_numbers: impl RangeBounds<BlockNumber>,
160) -> eyre::Result<BlockNumber>
161where
162    B: Block<Header = BH, Body = BB>,
163    BH: FullBlockHeader + Value,
164    BB: FullBlockBody<
165        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
166        OmmerHeader = BH,
167    >,
168    Era: EraMeta + ?Sized,
169    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
170    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
171{
172    let reader = open(meta)?;
173    let iter =
174        reader
175            .iter()
176            .map(Box::new(decode)
177                as Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>);
178    let iter = ProcessIter { iter, era: meta };
179
180    process_iter(iter, writer, provider, hash_collector, block_numbers)
181}
182
183type ProcessInnerIter<R, BH, BB> =
184    Map<BlockTupleIterator<R>, Box<dyn Fn(Result<BlockTuple, E2sError>) -> eyre::Result<(BH, BB)>>>;
185
186/// An iterator that wraps era file extraction. After the final item [`EraMeta::mark_as_processed`]
187/// is called to ensure proper cleanup.
188#[derive(Debug)]
189pub struct ProcessIter<'a, Era: ?Sized, R: Read, BH, BB>
190where
191    BH: FullBlockHeader + Value,
192    BB: FullBlockBody<OmmerHeader = BH>,
193{
194    iter: ProcessInnerIter<R, BH, BB>,
195    era: &'a Era,
196}
197
198impl<'a, Era: EraMeta + ?Sized, R: Read, BH, BB> Display for ProcessIter<'a, Era, R, BH, BB>
199where
200    BH: FullBlockHeader + Value,
201    BB: FullBlockBody<OmmerHeader = BH>,
202{
203    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
204        Display::fmt(&self.era.path().to_string_lossy(), f)
205    }
206}
207
208impl<'a, Era, R, BH, BB> Iterator for ProcessIter<'a, Era, R, BH, BB>
209where
210    R: Read + Seek,
211    Era: EraMeta + ?Sized,
212    BH: FullBlockHeader + Value,
213    BB: FullBlockBody<OmmerHeader = BH>,
214{
215    type Item = eyre::Result<(BH, BB)>;
216
217    fn next(&mut self) -> Option<Self::Item> {
218        match self.iter.next() {
219            Some(item) => Some(item),
220            None => match self.era.mark_as_processed() {
221                Ok(..) => None,
222                Err(e) => Some(Err(e)),
223            },
224        }
225    }
226}
227
228/// Opens the era file described by `meta`.
229pub fn open<Era>(meta: &Era) -> eyre::Result<Era1Reader<std::fs::File>>
230where
231    Era: EraMeta + ?Sized,
232{
233    let file = fs::open(meta.path())?;
234    let reader = Era1Reader::new(file);
235
236    Ok(reader)
237}
238
239/// Extracts a pair of [`FullBlockHeader`] and [`FullBlockBody`] from [`BlockTuple`].
240pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
241where
242    BH: FullBlockHeader + Value,
243    BB: FullBlockBody<OmmerHeader = BH>,
244    E: From<E2sError> + Error + Send + Sync + 'static,
245{
246    let block = block?;
247    let header: BH = block.header.decode()?;
248    let body: BB = block.body.decode()?;
249
250    Ok((header, body))
251}
252
253/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
254///
255/// Adds on to `total_difficulty` and collects hash to height using `hash_collector`.
256///
257/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
258/// [`end_bound`] or the end of the file.
259///
260/// Returns last block height.
261///
262/// [`start_bound`]: RangeBounds::start_bound
263/// [`end_bound`]: RangeBounds::end_bound
264pub fn process_iter<P, B, BB, BH>(
265    mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
266    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
267    provider: &P,
268    hash_collector: &mut Collector<BlockHash, BlockNumber>,
269    block_numbers: impl RangeBounds<BlockNumber>,
270) -> eyre::Result<BlockNumber>
271where
272    B: Block<Header = BH, Body = BB>,
273    BH: FullBlockHeader + Value,
274    BB: FullBlockBody<
275        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
276        OmmerHeader = BH,
277    >,
278    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
279    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
280{
281    let mut last_header_number = match block_numbers.start_bound() {
282        Bound::Included(&number) => number,
283        Bound::Excluded(&number) => number.saturating_add(1),
284        Bound::Unbounded => 0,
285    };
286    let target = match block_numbers.end_bound() {
287        Bound::Included(&number) => Some(number),
288        Bound::Excluded(&number) => Some(number.saturating_sub(1)),
289        Bound::Unbounded => None,
290    };
291
292    for block in &mut iter {
293        let (header, body) = block?;
294        let number = header.number();
295
296        if number <= last_header_number {
297            continue;
298        }
299        if let Some(target) = target &&
300            number > target
301        {
302            break;
303        }
304
305        let hash = header.hash_slow();
306        last_header_number = number;
307
308        // Append to Headers segment
309        writer.append_header(&header, &hash)?;
310
311        // Write bodies to database.
312        provider.append_block_bodies(vec![(header.number(), Some(body))])?;
313
314        hash_collector.insert(hash, number)?;
315    }
316
317    Ok(last_header_number)
318}
319
320/// Dumps the contents of `hash_collector` into [`tables::HeaderNumbers`].
321pub fn build_index<P, B, BB, BH>(
322    provider: &P,
323    hash_collector: &mut Collector<BlockHash, BlockNumber>,
324) -> eyre::Result<()>
325where
326    B: Block<Header = BH, Body = BB>,
327    BH: FullBlockHeader + Value,
328    BB: FullBlockBody<
329        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
330        OmmerHeader = BH,
331    >,
332    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
333    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
334{
335    let total_headers = hash_collector.len();
336    info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
337
338    // Database cursor for hash to number index
339    let mut cursor_header_numbers =
340        provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
341    // If we only have the genesis block hash, then we are at first sync, and we can remove it,
342    // add it to the collector and use tx.append on all hashes.
343    let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
344        let Some((hash, block_number)) = cursor_header_numbers.last()? &&
345        block_number.value()? == 0
346    {
347        hash_collector.insert(hash.key()?, 0)?;
348        cursor_header_numbers.delete_current()?;
349        true
350    } else {
351        false
352    };
353
354    let interval = (total_headers / 10).max(8192);
355
356    // Build block hash to block number index
357    for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
358        let (hash, number) = hash_to_number?;
359
360        if index != 0 && index.is_multiple_of(interval) {
361            info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
362        }
363
364        let hash = RawKey::<BlockHash>::from_vec(hash);
365        let number = RawValue::<BlockNumber>::from_vec(number);
366
367        if first_sync {
368            cursor_header_numbers.append(hash, &number)?;
369        } else {
370            cursor_header_numbers.upsert(hash, &number)?;
371        }
372    }
373
374    Ok(())
375}
376
377/// Calculates the total difficulty for a given block number by summing the difficulty
378/// of all blocks from genesis to the given block.
379///
380/// Very expensive - iterates through all blocks in batches of 1000.
381///
382/// Returns an error if any block is missing.
383pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
384where
385    P: BlockReader,
386{
387    let mut total_difficulty = U256::ZERO;
388    let mut start = 0;
389
390    while start <= num {
391        let end = (start + 1000 - 1).min(num);
392
393        total_difficulty +=
394            provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
395
396        start = end + 1;
397    }
398
399    Ok(total_difficulty)
400}