reth_era_utils/
export.rs

1//! Logic to export from database era1 block history
2//! and injecting them into era1 files with `Era1Writer`.
3
4use alloy_consensus::BlockHeader;
5use alloy_primitives::{BlockNumber, B256, U256};
6use eyre::{eyre, Result};
7use reth_era::{
8    e2s_types::IndexEntry,
9    era1_file::Era1Writer,
10    era1_types::{BlockIndex, Era1Id},
11    era_file_ops::{EraFileId, StreamWriter},
12    execution_types::{
13        Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
14        TotalDifficulty, MAX_BLOCKS_PER_ERA1,
15    },
16};
17use reth_fs_util as fs;
18use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
19use std::{
20    path::PathBuf,
21    time::{Duration, Instant},
22};
23use tracing::{debug, info, warn};
24
25const REPORT_INTERVAL_SECS: u64 = 10;
26const ENTRY_HEADER_SIZE: usize = 8;
27const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
28
29/// Configuration to export block history
30/// to era1 files
31#[derive(Clone, Debug)]
32pub struct ExportConfig {
33    /// Directory to export era1 files to
34    pub dir: PathBuf,
35    /// First block to export
36    pub first_block_number: BlockNumber,
37    /// Last block to export
38    pub last_block_number: BlockNumber,
39    /// Number of blocks per era1 file
40    /// It can never be larger than `MAX_BLOCKS_PER_ERA1 = 8192`
41    /// See also <`https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md`>
42    pub max_blocks_per_file: u64,
43    /// Network name.
44    pub network: String,
45}
46
47impl Default for ExportConfig {
48    fn default() -> Self {
49        Self {
50            dir: PathBuf::new(),
51            first_block_number: 0,
52            last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
53            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
54            network: "mainnet".to_string(),
55        }
56    }
57}
58
59impl ExportConfig {
60    /// Validates the export configuration parameters
61    pub fn validate(&self) -> Result<()> {
62        if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
63            return Err(eyre!(
64                "Max blocks per file ({}) exceeds ERA1 limit ({})",
65                self.max_blocks_per_file,
66                MAX_BLOCKS_PER_ERA1
67            ));
68        }
69
70        if self.max_blocks_per_file == 0 {
71            return Err(eyre!("Max blocks per file cannot be zero"));
72        }
73
74        Ok(())
75    }
76}
77
78/// Fetches block history data from the provider
79/// and prepares it for export to era1 files
80/// for a given number of blocks then writes them to disk.
81pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
82where
83    P: BlockReader,
84{
85    config.validate()?;
86    info!(
87        "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
88        config.first_block_number, config.last_block_number, config.max_blocks_per_file
89    );
90
91    // Determine the actual last block to export
92    // best_block_number() might be outdated, so check actual block availability
93    let last_block_number = determine_export_range(provider, config)?;
94
95    info!(
96        target: "era::history::export",
97        first = config.first_block_number,
98        last = last_block_number,
99        max_blocks_per_file = config.max_blocks_per_file,
100        "Preparing era1 export data"
101    );
102
103    if !config.dir.exists() {
104        fs::create_dir_all(&config.dir)
105            .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
106    }
107
108    let start_time = Instant::now();
109    let mut last_report_time = Instant::now();
110    let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
111
112    let mut created_files = Vec::new();
113    let mut total_blocks_processed = 0;
114
115    let mut total_difficulty = if config.first_block_number > 0 {
116        let prev_block_number = config.first_block_number - 1;
117        provider
118            .header_td_by_number(prev_block_number)?
119            .ok_or_else(|| eyre!("Total difficulty not found for block {prev_block_number}"))?
120    } else {
121        U256::ZERO
122    };
123
124    // Process blocks in chunks according to `max_blocks_per_file`
125    for start_block in
126        (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
127    {
128        let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
129        let block_count = (end_block - start_block + 1) as usize;
130
131        info!(
132            target: "era::history::export",
133            "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
134        );
135
136        let headers = provider.headers_range(start_block..=end_block)?;
137
138        // Extract first 4 bytes of last block's state root as historical identifier
139        let historical_root = headers
140            .last()
141            .map(|header| {
142                let state_root = header.state_root();
143                [state_root[0], state_root[1], state_root[2], state_root[3]]
144            })
145            .unwrap_or([0u8; 4]);
146
147        let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
148            .with_hash(historical_root);
149
150        debug!("Final file name {}", era1_id.to_file_name());
151        let file_path = config.dir.join(era1_id.to_file_name());
152        let file = std::fs::File::create(&file_path)?;
153        let mut writer = Era1Writer::new(file);
154        writer.write_version()?;
155
156        let mut offsets = Vec::<i64>::with_capacity(block_count);
157        let mut position = VERSION_ENTRY_SIZE as i64;
158        let mut blocks_written = 0;
159        let mut final_header_data = Vec::new();
160
161        for (i, header) in headers.into_iter().enumerate() {
162            let expected_block_number = start_block + i as u64;
163
164            let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
165                provider,
166                header,
167                expected_block_number,
168                &mut total_difficulty,
169            )?;
170
171            // Save last block's header data for accumulator
172            if expected_block_number == end_block {
173                final_header_data = compressed_header.data.clone();
174            }
175
176            let difficulty = TotalDifficulty::new(total_difficulty);
177
178            let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
179            let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
180            let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
181            let difficulty_size = 32 + ENTRY_HEADER_SIZE; // U256 is 32 + 8 bytes header overhead
182            let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
183
184            let block_tuple = BlockTuple::new(
185                compressed_header,
186                compressed_body,
187                compressed_receipts,
188                difficulty,
189            );
190
191            offsets.push(position);
192            position += total_size;
193
194            writer.write_block(&block_tuple)?;
195            blocks_written += 1;
196            total_blocks_processed += 1;
197
198            if last_report_time.elapsed() >= report_interval {
199                info!(
200                    target: "era::history::export",
201                    "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
202                    (total_blocks_processed as f64) /
203                        ((last_block_number - config.first_block_number + 1) as f64) *
204                        100.0,
205                    start_time.elapsed()
206                );
207                last_report_time = Instant::now();
208            }
209        }
210        if blocks_written > 0 {
211            let accumulator_hash =
212                B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
213            let accumulator = Accumulator::new(accumulator_hash);
214            let block_index = BlockIndex::new(start_block, offsets);
215
216            writer.write_accumulator(&accumulator)?;
217            writer.write_block_index(&block_index)?;
218            writer.flush()?;
219            created_files.push(file_path.clone());
220
221            info!(
222                target: "era::history::export",
223                "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
224            );
225        }
226    }
227
228    info!(
229        target: "era::history::export",
230        "Successfully wrote {} ERA1 files in {:?}",
231        created_files.len(),
232        start_time.elapsed()
233    );
234
235    Ok(created_files)
236}
237
238// Determines the actual last block number that can be exported,
239// Uses `headers_range` fallback when `best_block_number` is stale due to static file storage.
240fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
241where
242    P: HeaderProvider + BlockNumReader,
243{
244    let best_block_number = provider.best_block_number()?;
245
246    let last_block_number = if best_block_number < config.last_block_number {
247        warn!(
248            "Last block {} is beyond current head {}, setting last = head",
249            config.last_block_number, best_block_number
250        );
251
252        // Check if more blocks are actually available beyond what `best_block_number()` reports
253        if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
254            if let Some(last_header) = headers.last() {
255                let highest_block = last_header.number();
256                info!("Found highest available block {} via headers_range", highest_block);
257                highest_block
258            } else {
259                warn!("No headers found in range, using best_block_number {}", best_block_number);
260                best_block_number
261            }
262        } else {
263            warn!("headers_range failed, using best_block_number {}", best_block_number);
264            best_block_number
265        }
266    } else {
267        config.last_block_number
268    };
269
270    Ok(last_block_number)
271}
272
273// Compresses block data and returns compressed components with metadata
274fn compress_block_data<P>(
275    provider: &P,
276    header: P::Header,
277    expected_block_number: BlockNumber,
278    total_difficulty: &mut U256,
279) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
280where
281    P: BlockReader,
282{
283    let actual_block_number = header.number();
284
285    if expected_block_number != actual_block_number {
286        return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
287    }
288
289    let body = provider
290        .block_by_number(actual_block_number)?
291        .ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
292
293    let receipts = provider
294        .receipts_by_block(actual_block_number.into())?
295        .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
296
297    *total_difficulty += header.difficulty();
298
299    let compressed_header = CompressedHeader::from_header(&header)?;
300    let compressed_body = CompressedBody::from_body(&body)?;
301    let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
302        .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
303
304    Ok((compressed_header, compressed_body, compressed_receipts))
305}
306
307#[cfg(test)]
308mod tests {
309    use crate::ExportConfig;
310    use reth_era::execution_types::MAX_BLOCKS_PER_ERA1;
311    use tempfile::tempdir;
312
313    #[test]
314    fn test_export_config_validation() {
315        let temp_dir = tempdir().unwrap();
316
317        // Default config should pass
318        let default_config = ExportConfig::default();
319        assert!(default_config.validate().is_ok(), "Default config should be valid");
320
321        // Exactly at the limit should pass
322        let limit_config =
323            ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
324        assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
325
326        // Valid config should pass
327        let valid_config = ExportConfig {
328            dir: temp_dir.path().to_path_buf(),
329            max_blocks_per_file: 1000,
330            ..Default::default()
331        };
332        assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
333
334        // Zero blocks per file should fail
335        let zero_blocks_config = ExportConfig {
336            max_blocks_per_file: 0, // Invalid
337            ..Default::default()
338        };
339        let result = zero_blocks_config.validate();
340        assert!(result.is_err(), "Zero blocks per file should fail validation");
341        assert!(result.unwrap_err().to_string().contains("cannot be zero"));
342
343        // Exceeding era1 limit should fail
344        let oversized_config = ExportConfig {
345            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, // Invalid
346            ..Default::default()
347        };
348        let result = oversized_config.validate();
349        assert!(result.is_err(), "Oversized blocks per file should fail validation");
350        assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
351    }
352}