reth_era_utils/
export.rs

1//! Logic to export from database era1 block history
2//! and injecting them into era1 files with `Era1Writer`.
3
4use crate::calculate_td_by_number;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockNumber, B256, U256};
7use eyre::{eyre, Result};
8use reth_era::{
9    common::file_ops::{EraFileId, StreamWriter},
10    e2s::types::IndexEntry,
11    era1::{
12        file::Era1Writer,
13        types::{
14            execution::{
15                Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
16                TotalDifficulty, MAX_BLOCKS_PER_ERA1,
17            },
18            group::{BlockIndex, Era1Id},
19        },
20    },
21};
22use reth_fs_util as fs;
23use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
24use std::{
25    path::PathBuf,
26    time::{Duration, Instant},
27};
28use tracing::{debug, info, warn};
29
30const REPORT_INTERVAL_SECS: u64 = 10;
31const ENTRY_HEADER_SIZE: usize = 8;
32const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
33
34/// Configuration to export block history
35/// to era1 files
36#[derive(Clone, Debug)]
37pub struct ExportConfig {
38    /// Directory to export era1 files to
39    pub dir: PathBuf,
40    /// First block to export
41    pub first_block_number: BlockNumber,
42    /// Last block to export
43    pub last_block_number: BlockNumber,
44    /// Number of blocks per era1 file
45    /// It can never be larger than `MAX_BLOCKS_PER_ERA1 = 8192`
46    /// See also <`https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md`>
47    pub max_blocks_per_file: u64,
48    /// Network name.
49    pub network: String,
50}
51
52impl Default for ExportConfig {
53    fn default() -> Self {
54        Self {
55            dir: PathBuf::new(),
56            first_block_number: 0,
57            last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
58            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
59            network: "mainnet".to_string(),
60        }
61    }
62}
63
64impl ExportConfig {
65    /// Validates the export configuration parameters
66    pub fn validate(&self) -> Result<()> {
67        if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
68            return Err(eyre!(
69                "Max blocks per file ({}) exceeds ERA1 limit ({})",
70                self.max_blocks_per_file,
71                MAX_BLOCKS_PER_ERA1
72            ));
73        }
74
75        if self.max_blocks_per_file == 0 {
76            return Err(eyre!("Max blocks per file cannot be zero"));
77        }
78
79        Ok(())
80    }
81}
82
83/// Fetches block history data from the provider
84/// and prepares it for export to era1 files
85/// for a given number of blocks then writes them to disk.
86pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
87where
88    P: BlockReader,
89{
90    config.validate()?;
91    info!(
92        "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
93        config.first_block_number, config.last_block_number, config.max_blocks_per_file
94    );
95
96    // Determine the actual last block to export
97    // best_block_number() might be outdated, so check actual block availability
98    let last_block_number = determine_export_range(provider, config)?;
99
100    info!(
101        target: "era::history::export",
102        first = config.first_block_number,
103        last = last_block_number,
104        max_blocks_per_file = config.max_blocks_per_file,
105        "Preparing era1 export data"
106    );
107
108    if !config.dir.exists() {
109        fs::create_dir_all(&config.dir)
110            .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
111    }
112
113    let start_time = Instant::now();
114    let mut last_report_time = Instant::now();
115    let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
116
117    let mut created_files = Vec::new();
118    let mut total_blocks_processed = 0;
119
120    let mut total_difficulty = if config.first_block_number > 0 {
121        let prev_block_number = config.first_block_number - 1;
122        calculate_td_by_number(provider, prev_block_number)?
123    } else {
124        U256::ZERO
125    };
126
127    // Process blocks in chunks according to `max_blocks_per_file`
128    for start_block in
129        (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
130    {
131        let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
132        let block_count = (end_block - start_block + 1) as usize;
133
134        info!(
135            target: "era::history::export",
136            "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
137        );
138
139        let headers = provider.headers_range(start_block..=end_block)?;
140
141        // Extract first 4 bytes of last block's state root as historical identifier
142        let historical_root = headers
143            .last()
144            .map(|header| {
145                let state_root = header.state_root();
146                [state_root[0], state_root[1], state_root[2], state_root[3]]
147            })
148            .unwrap_or([0u8; 4]);
149
150        let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
151            .with_hash(historical_root);
152
153        let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
154            era1_id
155        } else {
156            era1_id.with_era_count()
157        };
158
159        debug!("Final file name {}", era1_id.to_file_name());
160        let file_path = config.dir.join(era1_id.to_file_name());
161        let file = std::fs::File::create(&file_path)?;
162        let mut writer = Era1Writer::new(file);
163        writer.write_version()?;
164
165        let mut offsets = Vec::<i64>::with_capacity(block_count);
166        let mut position = VERSION_ENTRY_SIZE as i64;
167        let mut blocks_written = 0;
168        let mut final_header_data = Vec::new();
169
170        for (i, header) in headers.into_iter().enumerate() {
171            let expected_block_number = start_block + i as u64;
172
173            let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
174                provider,
175                header,
176                expected_block_number,
177                &mut total_difficulty,
178            )?;
179
180            // Save last block's header data for accumulator
181            if expected_block_number == end_block {
182                final_header_data = compressed_header.data.clone();
183            }
184
185            let difficulty = TotalDifficulty::new(total_difficulty);
186
187            let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
188            let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
189            let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
190            let difficulty_size = 32 + ENTRY_HEADER_SIZE; // U256 is 32 + 8 bytes header overhead
191            let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
192
193            let block_tuple = BlockTuple::new(
194                compressed_header,
195                compressed_body,
196                compressed_receipts,
197                difficulty,
198            );
199
200            offsets.push(position);
201            position += total_size;
202
203            writer.write_block(&block_tuple)?;
204            blocks_written += 1;
205            total_blocks_processed += 1;
206
207            if last_report_time.elapsed() >= report_interval {
208                info!(
209                    target: "era::history::export",
210                    "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
211                    (total_blocks_processed as f64) /
212                        ((last_block_number - config.first_block_number + 1) as f64) *
213                        100.0,
214                    start_time.elapsed()
215                );
216                last_report_time = Instant::now();
217            }
218        }
219        if blocks_written > 0 {
220            let accumulator_hash =
221                B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
222            let accumulator = Accumulator::new(accumulator_hash);
223            let block_index = BlockIndex::new(start_block, offsets);
224
225            writer.write_accumulator(&accumulator)?;
226            writer.write_block_index(&block_index)?;
227            writer.flush()?;
228
229            info!(
230                target: "era::history::export",
231                "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
232            );
233            created_files.push(file_path);
234        }
235    }
236
237    info!(
238        target: "era::history::export",
239        "Successfully wrote {} ERA1 files in {:?}",
240        created_files.len(),
241        start_time.elapsed()
242    );
243
244    Ok(created_files)
245}
246
247// Determines the actual last block number that can be exported,
248// Uses `headers_range` fallback when `best_block_number` is stale due to static file storage.
249fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
250where
251    P: HeaderProvider + BlockNumReader,
252{
253    let best_block_number = provider.best_block_number()?;
254
255    let last_block_number = if best_block_number < config.last_block_number {
256        warn!(
257            "Last block {} is beyond current head {}, setting last = head",
258            config.last_block_number, best_block_number
259        );
260
261        // Check if more blocks are actually available beyond what `best_block_number()` reports
262        if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
263            if let Some(last_header) = headers.last() {
264                let highest_block = last_header.number();
265                info!("Found highest available block {} via headers_range", highest_block);
266                highest_block
267            } else {
268                warn!("No headers found in range, using best_block_number {}", best_block_number);
269                best_block_number
270            }
271        } else {
272            warn!("headers_range failed, using best_block_number {}", best_block_number);
273            best_block_number
274        }
275    } else {
276        config.last_block_number
277    };
278
279    Ok(last_block_number)
280}
281
282// Compresses block data and returns compressed components with metadata
283fn compress_block_data<P>(
284    provider: &P,
285    header: P::Header,
286    expected_block_number: BlockNumber,
287    total_difficulty: &mut U256,
288) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
289where
290    P: BlockReader,
291{
292    let actual_block_number = header.number();
293
294    if expected_block_number != actual_block_number {
295        return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
296    }
297
298    let body = provider
299        .block_by_number(actual_block_number)?
300        .ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
301
302    let receipts = provider
303        .receipts_by_block(actual_block_number.into())?
304        .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
305
306    *total_difficulty += header.difficulty();
307
308    let compressed_header = CompressedHeader::from_header(&header)?;
309    let compressed_body = CompressedBody::from_body(&body)?;
310    let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
311        .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
312
313    Ok((compressed_header, compressed_body, compressed_receipts))
314}
315
316#[cfg(test)]
317mod tests {
318    use crate::ExportConfig;
319    use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
320    use tempfile::tempdir;
321
322    #[test]
323    fn test_export_config_validation() {
324        let temp_dir = tempdir().unwrap();
325
326        // Default config should pass
327        let default_config = ExportConfig::default();
328        assert!(default_config.validate().is_ok(), "Default config should be valid");
329
330        // Exactly at the limit should pass
331        let limit_config =
332            ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
333        assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
334
335        // Valid config should pass
336        let valid_config = ExportConfig {
337            dir: temp_dir.path().to_path_buf(),
338            max_blocks_per_file: 1000,
339            ..Default::default()
340        };
341        assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
342
343        // Zero blocks per file should fail
344        let zero_blocks_config = ExportConfig {
345            max_blocks_per_file: 0, // Invalid
346            ..Default::default()
347        };
348        let result = zero_blocks_config.validate();
349        assert!(result.is_err(), "Zero blocks per file should fail validation");
350        assert!(result.unwrap_err().to_string().contains("cannot be zero"));
351
352        // Exceeding era1 limit should fail
353        let oversized_config = ExportConfig {
354            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, // Invalid
355            ..Default::default()
356        };
357        let result = oversized_config.validate();
358        assert!(result.is_err(), "Oversized blocks per file should fail validation");
359        assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
360    }
361}