Skip to main content

reth_era_utils/
history.rs

1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockHash, BlockNumber, U256};
3use futures_util::{Stream, StreamExt};
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    table::Value,
7    tables,
8    transaction::{DbTx, DbTxMut},
9    RawKey, RawTable, RawValue,
10};
11use reth_era::{
12    common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
13    e2s::error::E2sError,
14    era1::{file::Era1Reader, types::execution::BlockTuple},
15    ere::{file::EreReader, types::execution::BlockTuple as EreBlockTuple},
16};
17use reth_era_downloader::EraMeta;
18use reth_etl::Collector;
19use reth_fs_util as fs;
20use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
21use reth_provider::{
22    providers::StaticFileProviderRWRefMut, BlockReader, BlockWriter, StaticFileProviderFactory,
23    StaticFileSegment, StaticFileWriter,
24};
25use reth_stages_types::{
26    CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
27};
28use reth_storage_api::{
29    errors::ProviderResult, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider,
30    StageCheckpointWriter,
31};
32use std::{collections::Bound, error::Error, ops::RangeBounds, sync::mpsc};
33use tracing::info;
34
35/// Reads execution `(header, body)` pairs out of an ERA file.
36///
37/// Per-format seam of the import pipeline.
38pub trait EraBlockReader<BH, BB> {
39    /// Opens the ERA file at `meta` and iterates its execution blocks.
40    fn blocks<M: EraMeta + ?Sized>(
41        meta: &M,
42    ) -> eyre::Result<impl Iterator<Item = eyre::Result<(BH, BB)>>>;
43}
44
45/// [`EraBlockReader`] for `.era1` files.
46#[derive(Debug)]
47pub struct Era1;
48
49impl<BH, BB> EraBlockReader<BH, BB> for Era1
50where
51    BH: FullBlockHeader + Value,
52    BB: FullBlockBody<OmmerHeader = BH>,
53{
54    fn blocks<M: EraMeta + ?Sized>(
55        meta: &M,
56    ) -> eyre::Result<impl Iterator<Item = eyre::Result<(BH, BB)>>> {
57        let reader: Era1Reader<std::fs::File> = open(meta)?;
58        Ok(reader.iter().map(decode::<BH, BB, E2sError>))
59    }
60}
61
62impl<BH, BB> EraBlockReader<BH, BB> for Ere
63where
64    BH: FullBlockHeader + Value,
65    BB: FullBlockBody<OmmerHeader = BH>,
66{
67    fn blocks<M: EraMeta + ?Sized>(
68        meta: &M,
69    ) -> eyre::Result<impl Iterator<Item = eyre::Result<(BH, BB)>>> {
70        let reader: EreReader<std::fs::File> = open(meta)?;
71        Ok(reader.iter().map(Self::decode))
72    }
73}
74
75/// [`EraBlockReader`] for `.ere`/`.erae` files.
76#[derive(Debug)]
77pub struct Ere;
78
79impl Ere {
80    /// Extracts a pair of [`FullBlockHeader`] and [`FullBlockBody`] from an ERE block tuple, whose
81    /// header and body are RLP-compressed.
82    pub fn decode<BH, BB, E>(block: Result<EreBlockTuple, E>) -> eyre::Result<(BH, BB)>
83    where
84        BH: FullBlockHeader + Value,
85        BB: FullBlockBody<OmmerHeader = BH>,
86        E: From<E2sError> + Error + Send + Sync + 'static,
87    {
88        let block = block?;
89        let header: BH = block.header.decode()?;
90        let body: BB = block.body.decode()?;
91        Ok((header, body))
92    }
93}
94
95/// Opens the ERA file at `meta` with the format's [`StreamReader`].
96pub fn open<Reader>(meta: &(impl EraMeta + ?Sized)) -> eyre::Result<Reader>
97where
98    Reader: StreamReader<std::fs::File>,
99{
100    Ok(Reader::new(fs::open(meta.path())?))
101}
102
103/// Imports blocks from `downloader`, decoding each file with the [`EraBlockReader`] `S`.
104///
105/// When `to_block` is set, the import stops after reaching that block height; otherwise it
106/// continues until the source has no more files.
107///
108/// Returns current block height.
109pub fn import<S, Downloader, Era, PF, B, BB, BH>(
110    mut downloader: Downloader,
111    provider_factory: &PF,
112    hash_collector: &mut Collector<BlockHash, BlockNumber>,
113    to_block: Option<BlockNumber>,
114) -> eyre::Result<BlockNumber>
115where
116    S: EraBlockReader<BH, BB>,
117    B: Block<Header = BH, Body = BB>,
118    BH: FullBlockHeader + Value,
119    BB: FullBlockBody<
120        Transaction = <<<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
121        OmmerHeader = BH,
122    >,
123    Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
124    Era: EraMeta + Send + 'static,
125    PF: DatabaseProviderFactory<
126        ProviderRW: BlockWriter<Block = B>
127            + DBProvider
128            + StaticFileProviderFactory<Primitives: NodePrimitives<Block = B, BlockHeader = BH, BlockBody = BB>>
129            + StageCheckpointWriter,
130    > + StaticFileProviderFactory<Primitives = <<PF as DatabaseProviderFactory>::ProviderRW as NodePrimitivesProvider>::Primitives>,
131{
132    let (tx, rx) = mpsc::channel();
133
134    // Handle IO-bound async download in a background tokio task
135    tokio::spawn(async move {
136        while let Some(file) = downloader.next().await {
137            tx.send(Some(file))?;
138        }
139        tx.send(None)
140    });
141
142    let static_file_provider = provider_factory.static_file_provider();
143
144    // Consistency check of expected headers in static files vs DB is done on provider::sync_gap
145    // when poll_execute_ready is polled.
146    let mut height = static_file_provider
147        .get_highest_static_file_block(StaticFileSegment::Headers)
148        .unwrap_or_default();
149
150    let end = to_block.map_or(Bound::Unbounded, Bound::Included);
151
152    while let Some(meta) = rx.recv()? {
153        let meta = meta?;
154        let from = height;
155        let provider = provider_factory.database_provider_rw()?;
156
157        height = process::<S, _, _, _, _>(
158            &meta,
159            &mut static_file_provider.latest_writer(StaticFileSegment::Headers)?,
160            &provider,
161            hash_collector,
162            (Bound::Included(height), end),
163        )?;
164
165        save_stage_checkpoints(&provider, from, height, height, height)?;
166
167        provider.commit()?;
168
169        info!(target: "era::history::import", first = from, last = height, file = %meta.path().display(), "Imported ERA file");
170
171        if to_block.is_some_and(|to| height >= to) {
172            break;
173        }
174    }
175
176    let provider = provider_factory.database_provider_rw()?;
177
178    build_index(&provider, hash_collector)?;
179
180    provider.commit()?;
181
182    Ok(height)
183}
184
185/// Saves progress of ERA import into stages sync.
186///
187/// Since the ERA import does the same work as `HeaderStage` and `BodyStage`, it needs to inform
188/// these stages that this work has already been done. Otherwise, there might be some conflict with
189/// database integrity.
190pub fn save_stage_checkpoints<P>(
191    provider: P,
192    from: BlockNumber,
193    to: BlockNumber,
194    processed: u64,
195    total: u64,
196) -> ProviderResult<()>
197where
198    P: StageCheckpointWriter,
199{
200    provider.save_stage_checkpoint(
201        StageId::Headers,
202        StageCheckpoint::new(to).with_headers_stage_checkpoint(HeadersCheckpoint {
203            block_range: CheckpointBlockRange { from, to },
204            progress: EntitiesCheckpoint { processed, total },
205        }),
206    )?;
207    provider.save_stage_checkpoint(
208        StageId::Bodies,
209        StageCheckpoint::new(to)
210            .with_entities_stage_checkpoint(EntitiesCheckpoint { processed, total }),
211    )?;
212    Ok(())
213}
214
215/// Reads `meta` with the [`EraBlockReader`] `S`, appends its blocks within `block_numbers`, and
216/// marks `meta` processed if the file was fully consumed. Returns last block height.
217pub fn process<S, P, B, BB, BH>(
218    meta: &(impl EraMeta + ?Sized),
219    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
220    provider: &P,
221    hash_collector: &mut Collector<BlockHash, BlockNumber>,
222    block_numbers: impl RangeBounds<BlockNumber>,
223) -> eyre::Result<BlockNumber>
224where
225    S: EraBlockReader<BH, BB>,
226    B: Block<Header = BH, Body = BB>,
227    BH: FullBlockHeader + Value,
228    BB: FullBlockBody<
229        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
230        OmmerHeader = BH,
231    >,
232    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
233    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
234{
235    let iter = S::blocks(meta)?
236        .map(Some)
237        .chain(std::iter::once_with(|| match meta.mark_as_processed() {
238            Ok(()) => None,
239            Err(error) => Some(Err(error)),
240        }))
241        .flatten();
242
243    process_iter(iter, writer, provider, hash_collector, block_numbers)
244}
245
246/// Extracts a pair of [`FullBlockHeader`] and [`FullBlockBody`] from [`BlockTuple`].
247pub fn decode<BH, BB, E>(block: Result<BlockTuple, E>) -> eyre::Result<(BH, BB)>
248where
249    BH: FullBlockHeader + Value,
250    BB: FullBlockBody<OmmerHeader = BH>,
251    E: From<E2sError> + Error + Send + Sync + 'static,
252{
253    let block = block?;
254    let header: BH = block.header.decode()?;
255    let body: BB = block.body.decode()?;
256
257    Ok((header, body))
258}
259
260/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
261///
262/// Collects hash to height using `hash_collector`.
263///
264/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
265/// [`end_bound`] or the end of the file.
266///
267/// Returns last block height.
268///
269/// [`start_bound`]: RangeBounds::start_bound
270/// [`end_bound`]: RangeBounds::end_bound
271pub fn process_iter<P, B, BB, BH>(
272    mut iter: impl Iterator<Item = eyre::Result<(BH, BB)>>,
273    writer: &mut StaticFileProviderRWRefMut<'_, <P as NodePrimitivesProvider>::Primitives>,
274    provider: &P,
275    hash_collector: &mut Collector<BlockHash, BlockNumber>,
276    block_numbers: impl RangeBounds<BlockNumber>,
277) -> eyre::Result<BlockNumber>
278where
279    B: Block<Header = BH, Body = BB>,
280    BH: FullBlockHeader + Value,
281    BB: FullBlockBody<
282        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
283        OmmerHeader = BH,
284    >,
285    P: DBProvider<Tx: DbTxMut> + NodePrimitivesProvider + BlockWriter<Block = B>,
286    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
287{
288    let mut last_header_number = match block_numbers.start_bound() {
289        Bound::Included(&number) => number,
290        Bound::Excluded(&number) => number.saturating_add(1),
291        Bound::Unbounded => 0,
292    };
293    let target = match block_numbers.end_bound() {
294        Bound::Included(&number) => Some(number),
295        Bound::Excluded(&number) => Some(number.saturating_sub(1)),
296        Bound::Unbounded => None,
297    };
298
299    for block in &mut iter {
300        let (header, body) = block?;
301        let number = header.number();
302
303        if number <= last_header_number {
304            continue;
305        }
306        if let Some(target) = target &&
307            number > target
308        {
309            break;
310        }
311
312        let hash = header.hash_slow();
313        last_header_number = number;
314
315        // Append to Headers segment
316        writer.append_header(&header, &hash)?;
317
318        // Write bodies to database.
319        provider.append_block_bodies(vec![(header.number(), Some(&body))])?;
320
321        hash_collector.insert(hash, number)?;
322    }
323
324    Ok(last_header_number)
325}
326
327/// Dumps the contents of `hash_collector` into [`tables::HeaderNumbers`].
328pub fn build_index<P>(
329    provider: &P,
330    hash_collector: &mut Collector<BlockHash, BlockNumber>,
331) -> eyre::Result<()>
332where
333    P: DBProvider<Tx: DbTxMut>,
334{
335    let total_headers = hash_collector.len();
336    info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
337
338    // Database cursor for hash to number index
339    let mut cursor_header_numbers =
340        provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
341    // If we only have the genesis block hash, then we are at first sync, and we can remove it,
342    // add it to the collector and use tx.append on all hashes.
343    let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
344        let Some((hash, block_number)) = cursor_header_numbers.last()? &&
345        block_number.value()? == 0
346    {
347        hash_collector.insert(hash.key()?, 0)?;
348        cursor_header_numbers.delete_current()?;
349        true
350    } else {
351        false
352    };
353
354    let interval = (total_headers / 10).max(8192);
355
356    // Build block hash to block number index
357    for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
358        let (hash, number) = hash_to_number?;
359
360        if index != 0 && index.is_multiple_of(interval) {
361            info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
362        }
363
364        let hash = RawKey::<BlockHash>::from_vec(hash);
365        let number = RawValue::<BlockNumber>::from_vec(number);
366
367        if first_sync {
368            cursor_header_numbers.append(hash, &number)?;
369        } else {
370            cursor_header_numbers.upsert(hash, &number)?;
371        }
372    }
373
374    Ok(())
375}
376
377/// Calculates the total difficulty for a given block number by summing the difficulty
378/// of all blocks from genesis to the given block.
379///
380/// Very expensive - iterates through all blocks in batches of 1000.
381///
382/// Returns an error if any block is missing.
383pub fn calculate_td_by_number<P>(provider: &P, num: BlockNumber) -> eyre::Result<U256>
384where
385    P: BlockReader,
386{
387    let mut total_difficulty = U256::ZERO;
388    let mut start = 0;
389
390    while start <= num {
391        let end = (start + 1000 - 1).min(num);
392
393        total_difficulty +=
394            provider.headers_range(start..=end)?.iter().map(|h| h.difficulty()).sum::<U256>();
395
396        start = end + 1;
397    }
398
399    Ok(total_difficulty)
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use alloy_consensus::Header;
406    use reth_db_common::init::init_genesis;
407    use reth_ethereum_primitives::{Block, BlockBody};
408    use reth_provider::{
409        test_utils::create_test_provider_factory, DatabaseProviderFactory,
410        StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
411    };
412    use std::{cell::Cell, path::Path};
413    use tempfile::tempdir;
414
415    struct TestEra;
416
417    impl EraBlockReader<Header, BlockBody> for TestEra {
418        fn blocks<M: EraMeta + ?Sized>(
419            _meta: &M,
420        ) -> eyre::Result<impl Iterator<Item = eyre::Result<(Header, BlockBody)>>> {
421            Ok([1, 2]
422                .into_iter()
423                .map(|number| Ok((Header { number, ..Default::default() }, BlockBody::default()))))
424        }
425    }
426
427    #[derive(Debug)]
428    struct TestMeta {
429        marked: Cell<bool>,
430    }
431
432    impl EraMeta for TestMeta {
433        fn mark_as_processed(&self) -> eyre::Result<()> {
434            self.marked.set(true);
435            Ok(())
436        }
437
438        fn path(&self) -> &Path {
439            Path::new("test.era1")
440        }
441    }
442
443    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
444    async fn import_stops_at_to_block() {
445        let pf = create_test_provider_factory();
446        init_genesis(&pf).unwrap();
447
448        let folder = tempdir().unwrap();
449        let mut hash_collector = Collector::new(4096, Some(folder.path().to_owned()));
450
451        // Each file yields blocks 1 and 2; without `to_block` the import would reach 2.
452        let stream = futures_util::stream::iter(vec![
453            Ok(TestMeta { marked: Cell::new(false) }),
454            Ok(TestMeta { marked: Cell::new(false) }),
455        ]);
456
457        let height =
458            import::<TestEra, _, _, _, Block, _, _>(stream, &pf, &mut hash_collector, Some(1))
459                .unwrap();
460
461        assert_eq!(height, 1);
462    }
463
464    #[test]
465    fn process_does_not_mark_partially_consumed_file_processed() {
466        let pf = create_test_provider_factory();
467        init_genesis(&pf).unwrap();
468
469        let static_file_provider = pf.static_file_provider();
470        let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
471        let provider = pf.database_provider_rw().unwrap();
472        let folder = tempdir().unwrap();
473        let mut hash_collector = Collector::new(4096, Some(folder.path().to_owned()));
474        let meta = TestMeta { marked: Cell::new(false) };
475
476        let height = process::<TestEra, _, Block, _, _>(
477            &meta,
478            &mut writer,
479            &provider,
480            &mut hash_collector,
481            0..=1,
482        )
483        .unwrap();
484
485        assert_eq!(height, 1);
486        assert!(!meta.marked.get());
487    }
488}