Skip to main content

reth_era_utils/export/
mod.rs

1//! Exports node block history from storage into ERA files.
2//!
3//! [`export`] is the format-agnostic driver: it resolves the export range, walks the requested
4//! blocks in `max_blocks_per_file` chunks, and hands each chunk to an [`EraBlockWriter`]. A writer
5//! turns one chunk of [`ExportBlock`]s into one on-disk file and owns every format-specific detail
6//! (receipt encoding, accumulator, block index, record layout, file naming).
7//!
8//! [`Era1`](crate::Era1) writes `.era1` files and [`Ere`](crate::Ere) writes `.ere` files.
9
10mod era1;
11mod ere;
12
13use crate::calculate_td_by_number;
14use alloy_consensus::{BlockHeader, Sealable};
15use alloy_primitives::{BlockNumber, B256, U256};
16use alloy_rlp::Encodable;
17use eyre::{eyre, Result};
18use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
19use reth_fs_util as fs;
20use reth_primitives_traits::{Block, Receipt};
21use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider, ReceiptProvider};
22use std::{
23    path::{Path, PathBuf},
24    time::{Duration, Instant},
25};
26use tracing::{info, warn};
27
28/// Minimum delay between export progress log lines, so large exports report periodically without
29/// flooding the logs.
30const REPORT_INTERVAL_SECS: u64 = 10;
31
32/// Configuration to export block history to ERA files.
33///
34/// Shared by every [`EraBlockWriter`]; the per-file block ceiling is the same `8192` for all
35/// formats.
36#[derive(Clone, Debug)]
37pub struct ExportConfig {
38    /// Directory to export ERA files to
39    pub dir: PathBuf,
40    /// First block to export
41    pub first_block_number: BlockNumber,
42    /// Last block to export
43    pub last_block_number: BlockNumber,
44    /// Number of blocks per ERA file.
45    /// It can never be larger than `MAX_BLOCKS_PER_ERA1 = 8192`
46    /// See also <`https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md`>
47    pub max_blocks_per_file: u64,
48    /// Network name.
49    pub network: String,
50}
51
52impl Default for ExportConfig {
53    fn default() -> Self {
54        Self {
55            dir: PathBuf::new(),
56            first_block_number: 0,
57            last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
58            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
59            network: "mainnet".to_string(),
60        }
61    }
62}
63
64impl ExportConfig {
65    /// Validates the export configuration parameters
66    pub fn validate(&self) -> Result<()> {
67        if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
68            return Err(eyre!(
69                "Max blocks per file ({}) exceeds ERA1 limit ({})",
70                self.max_blocks_per_file,
71                MAX_BLOCKS_PER_ERA1
72            ));
73        }
74
75        if self.max_blocks_per_file == 0 {
76            return Err(eyre!("Max blocks per file cannot be zero"));
77        }
78
79        Ok(())
80    }
81}
82
83/// One block's data, gathered by [`export`] and handed to an [`EraBlockWriter`].
84#[derive(Debug)]
85pub struct ExportBlock<H, B, R> {
86    /// Block header.
87    pub header: H,
88    /// Hash of `header`, feeding the file's accumulator.
89    pub block_hash: B256,
90    /// Block body.
91    pub body: B,
92    /// Block receipts in the provider's native, bloom-bearing form. A writer re-encodes them into
93    /// its own form (`era1` keeps the bloom, `ere` stores the slim variant).
94    pub receipts: Vec<R>,
95    /// Total difficulty up to and including this block.
96    pub total_difficulty: U256,
97}
98
99impl<H, B, R> ExportBlock<H, B, R> {
100    /// The `(block_hash, total_difficulty)` pair that feeds an ERA file's header-record
101    /// accumulator.
102    ///
103    /// The accumulator record type differs per format, so each writer wraps this pair in its own
104    /// `HeaderRecord`.
105    pub const fn header_record(&self) -> (B256, U256) {
106        (self.block_hash, self.total_difficulty)
107    }
108}
109
110/// Writes a chunk of consecutive blocks as a single ERA file.
111///
112/// One implementor exists per ERA format. A chunk is ordered, non-empty, and at most
113/// [`ExportConfig::max_blocks_per_file`] blocks long.
114pub trait EraBlockWriter {
115    /// Writes `blocks` as a single ERA file in `dir`, returning the created file's path.
116    ///
117    /// `max_blocks_per_file` is the configured per-file ceiling; a writer compares it against its
118    /// own format limit to decide whether the filename carries an era-count segment.
119    fn write_file<H, B, R>(
120        network: &str,
121        max_blocks_per_file: u64,
122        blocks: &[ExportBlock<H, B, R>],
123        dir: &Path,
124    ) -> Result<PathBuf>
125    where
126        H: BlockHeader + Encodable,
127        B: Encodable,
128        R: Receipt;
129}
130
131/// Per-format accumulator over a chunk's header records.
132///
133/// Each ERA format defines its own accumulator and header-record types in [`reth_era`], yet both
134/// build the accumulator from the same `(block_hash, total_difficulty)` pairs. This trait is the
135/// seam that keeps [`accumulator`] format-agnostic.
136pub(crate) trait ChunkAccumulator: Sized {
137    /// Builds the accumulator from the chunk's `(block_hash, total_difficulty)` header records.
138    fn from_pairs(records: &[(B256, U256)]) -> Result<Self>;
139}
140
141/// Computes the chunk's accumulator from each block's `(block_hash, total_difficulty)` header
142/// record, in the format `A`.
143fn accumulator<A, H, B, R>(blocks: &[ExportBlock<H, B, R>]) -> Result<A>
144where
145    A: ChunkAccumulator,
146{
147    let records: Vec<(B256, U256)> = blocks.iter().map(ExportBlock::header_record).collect();
148    A::from_pairs(&records)
149}
150
151/// A chunk of [`ExportBlock`]s sourced from provider `P`.
152type Chunk<P> =
153    Vec<ExportBlock<<P as HeaderProvider>::Header, BodyOf<P>, <P as ReceiptProvider>::Receipt>>;
154
155/// The block body type produced by provider `P`.
156type BodyOf<P> = <<P as BlockReader>::Block as Block>::Body;
157
158/// Fetches block history from `provider` and writes it to ERA files in the `W` format, chunked by
159/// [`ExportConfig::max_blocks_per_file`].
160///
161/// Returns the paths of the files that were created.
162pub fn export<W, P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
163where
164    W: EraBlockWriter,
165    P: BlockReader,
166    P::Header: BlockHeader + Sealable + Encodable,
167    BodyOf<P>: Encodable,
168    P::Receipt: Receipt,
169{
170    config.validate()?;
171
172    // `best_block_number()` can be stale behind static files, so reconcile against what is actually
173    // available.
174    let last_block = determine_export_range(provider, config)?;
175
176    info!(
177        target: "era::history::export",
178        first = config.first_block_number,
179        last = last_block,
180        max_blocks_per_file = config.max_blocks_per_file,
181        "Preparing ERA export data"
182    );
183
184    if !config.dir.exists() {
185        fs::create_dir_all(&config.dir)
186            .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
187    }
188
189    let mut progress = ExportProgress::new(last_block - config.first_block_number + 1);
190    let mut total_difficulty = seed_total_difficulty(provider, config)?;
191    let mut created_files = Vec::new();
192
193    for start_block in
194        (config.first_block_number..=last_block).step_by(config.max_blocks_per_file as usize)
195    {
196        let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block);
197
198        let blocks = gather_chunk(
199            provider,
200            start_block..=end_block,
201            last_block,
202            &mut total_difficulty,
203            &mut progress,
204        )?;
205        if blocks.is_empty() {
206            continue;
207        }
208
209        let file_path =
210            W::write_file(&config.network, config.max_blocks_per_file, &blocks, &config.dir)?;
211
212        info!(target: "era::history::export", "Wrote ERA file: {file_path:?} with {} blocks", blocks.len());
213        created_files.push(file_path);
214    }
215
216    info!(
217        target: "era::history::export",
218        "Successfully wrote {} ERA files in {:?}",
219        created_files.len(),
220        progress.elapsed()
221    );
222
223    Ok(created_files)
224}
225
226/// The four-byte short hash an ERA file name carries, taken from its accumulator root.
227fn short_hash(root: B256) -> [u8; 4] {
228    root[..4].try_into().expect("root is 32 bytes")
229}
230
231/// Total difficulty up to the block preceding the export range, the starting point for the running
232/// total threaded through every chunk.
233fn seed_total_difficulty<P: BlockReader>(provider: &P, config: &ExportConfig) -> Result<U256> {
234    if config.first_block_number > 0 {
235        calculate_td_by_number(provider, config.first_block_number - 1)
236    } else {
237        Ok(U256::ZERO)
238    }
239}
240
241/// Loads the headers, bodies and receipts for `range` into a [`Chunk`], advancing
242/// `total_difficulty` by each header's difficulty.
243fn gather_chunk<P>(
244    provider: &P,
245    range: std::ops::RangeInclusive<BlockNumber>,
246    last_block: BlockNumber,
247    total_difficulty: &mut U256,
248    progress: &mut ExportProgress,
249) -> Result<Chunk<P>>
250where
251    P: BlockReader,
252    P::Header: BlockHeader + Sealable,
253{
254    let start_block = *range.start();
255    let headers = provider.headers_range(range)?;
256    let mut blocks = Vec::with_capacity(headers.len());
257
258    for (i, header) in headers.into_iter().enumerate() {
259        let expected = start_block + i as u64;
260        let actual = header.number();
261        if expected != actual {
262            return Err(eyre!("Expected block {expected}, got {actual}"));
263        }
264
265        // `CompressedBody` holds rlp(body), not rlp(block), so take the body off the full block.
266        let body = provider
267            .block_by_number(actual)?
268            .ok_or_else(|| eyre!("Block not found for block {actual}"))?
269            .into_body();
270        let receipts = provider
271            .receipts_by_block(actual.into())?
272            .ok_or_else(|| eyre!("Receipts not found for block {actual}"))?;
273
274        let block_hash = header.hash_slow();
275        *total_difficulty += header.difficulty();
276
277        blocks.push(ExportBlock {
278            header,
279            block_hash,
280            body,
281            receipts,
282            total_difficulty: *total_difficulty,
283        });
284        progress.record(actual, last_block);
285    }
286
287    Ok(blocks)
288}
289
290/// Determines the actual last block number that can be exported.
291///
292/// Uses a `headers_range` fallback when `best_block_number` is stale due to static file storage.
293fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
294where
295    P: HeaderProvider + BlockNumReader,
296{
297    let best_block_number = provider.best_block_number()?;
298
299    if best_block_number >= config.last_block_number {
300        return Ok(config.last_block_number);
301    }
302
303    warn!(
304        "Last block {} is beyond current head {}, setting last = head",
305        config.last_block_number, best_block_number
306    );
307
308    // Check if more blocks are actually available beyond what `best_block_number()` reports.
309    match provider.headers_range(best_block_number..=config.last_block_number) {
310        Ok(headers) => match headers.last() {
311            Some(last_header) => {
312                let highest_block = last_header.number();
313                info!("Found highest available block {} via headers_range", highest_block);
314                Ok(highest_block)
315            }
316            None => {
317                warn!("No headers found in range, using best_block_number {}", best_block_number);
318                Ok(best_block_number)
319            }
320        },
321        Err(_) => {
322            warn!("headers_range failed, using best_block_number {}", best_block_number);
323            Ok(best_block_number)
324        }
325    }
326}
327
328/// Throttled progress reporter for a running export.
329struct ExportProgress {
330    start: Instant,
331    last_report: Instant,
332    interval: Duration,
333    total_blocks: u64,
334    processed: u64,
335}
336
337impl ExportProgress {
338    fn new(total_blocks: u64) -> Self {
339        let now = Instant::now();
340        Self {
341            start: now,
342            last_report: now,
343            interval: Duration::from_secs(REPORT_INTERVAL_SECS),
344            total_blocks,
345            processed: 0,
346        }
347    }
348
349    /// Counts one processed block and logs progress at most once per [`REPORT_INTERVAL_SECS`].
350    fn record(&mut self, current_block: BlockNumber, last_block: BlockNumber) {
351        self.processed += 1;
352        if self.last_report.elapsed() >= self.interval {
353            info!(
354                target: "era::history::export",
355                "Export progress: block {current_block}/{last_block} ({:.2}%) - elapsed: {:?}",
356                self.processed as f64 / self.total_blocks as f64 * 100.0,
357                self.start.elapsed()
358            );
359            self.last_report = Instant::now();
360        }
361    }
362
363    fn elapsed(&self) -> Duration {
364        self.start.elapsed()
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::ExportConfig;
371    use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
372    use tempfile::tempdir;
373
374    #[test]
375    fn test_export_config_validation() {
376        let temp_dir = tempdir().unwrap();
377
378        // Default config should pass
379        let default_config = ExportConfig::default();
380        assert!(default_config.validate().is_ok(), "Default config should be valid");
381
382        // Exactly at the limit should pass
383        let limit_config =
384            ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
385        assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
386
387        // Valid config should pass
388        let valid_config = ExportConfig {
389            dir: temp_dir.path().to_path_buf(),
390            max_blocks_per_file: 1000,
391            ..Default::default()
392        };
393        assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
394
395        // Zero blocks per file should fail
396        let zero_blocks_config = ExportConfig {
397            max_blocks_per_file: 0, // Invalid
398            ..Default::default()
399        };
400        let result = zero_blocks_config.validate();
401        assert!(result.is_err(), "Zero blocks per file should fail validation");
402        assert!(result.unwrap_err().to_string().contains("cannot be zero"));
403
404        // Exceeding era1 limit should fail
405        let oversized_config = ExportConfig {
406            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, // Invalid
407            ..Default::default()
408        };
409        let result = oversized_config.validate();
410        assert!(result.is_err(), "Oversized blocks per file should fail validation");
411        assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
412    }
413}