reth_era_utils/
export.rs

1//! Logic to export from database era1 block history
2//! and injecting them into era1 files with `Era1Writer`.
3
4use crate::calculate_td_by_number;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockNumber, B256, U256};
7use eyre::{eyre, Result};
8use reth_era::{
9    e2s_types::IndexEntry,
10    era1_file::Era1Writer,
11    era1_types::{BlockIndex, Era1Id},
12    era_file_ops::{EraFileId, StreamWriter},
13    execution_types::{
14        Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
15        TotalDifficulty, MAX_BLOCKS_PER_ERA1,
16    },
17};
18use reth_fs_util as fs;
19use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
20use std::{
21    path::PathBuf,
22    time::{Duration, Instant},
23};
24use tracing::{debug, info, warn};
25
26const REPORT_INTERVAL_SECS: u64 = 10;
27const ENTRY_HEADER_SIZE: usize = 8;
28const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
29
30/// Configuration to export block history
31/// to era1 files
32#[derive(Clone, Debug)]
33pub struct ExportConfig {
34    /// Directory to export era1 files to
35    pub dir: PathBuf,
36    /// First block to export
37    pub first_block_number: BlockNumber,
38    /// Last block to export
39    pub last_block_number: BlockNumber,
40    /// Number of blocks per era1 file
41    /// It can never be larger than `MAX_BLOCKS_PER_ERA1 = 8192`
42    /// See also <`https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md`>
43    pub max_blocks_per_file: u64,
44    /// Network name.
45    pub network: String,
46}
47
48impl Default for ExportConfig {
49    fn default() -> Self {
50        Self {
51            dir: PathBuf::new(),
52            first_block_number: 0,
53            last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
54            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
55            network: "mainnet".to_string(),
56        }
57    }
58}
59
60impl ExportConfig {
61    /// Validates the export configuration parameters
62    pub fn validate(&self) -> Result<()> {
63        if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
64            return Err(eyre!(
65                "Max blocks per file ({}) exceeds ERA1 limit ({})",
66                self.max_blocks_per_file,
67                MAX_BLOCKS_PER_ERA1
68            ));
69        }
70
71        if self.max_blocks_per_file == 0 {
72            return Err(eyre!("Max blocks per file cannot be zero"));
73        }
74
75        Ok(())
76    }
77}
78
79/// Fetches block history data from the provider
80/// and prepares it for export to era1 files
81/// for a given number of blocks then writes them to disk.
82pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
83where
84    P: BlockReader,
85{
86    config.validate()?;
87    info!(
88        "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
89        config.first_block_number, config.last_block_number, config.max_blocks_per_file
90    );
91
92    // Determine the actual last block to export
93    // best_block_number() might be outdated, so check actual block availability
94    let last_block_number = determine_export_range(provider, config)?;
95
96    info!(
97        target: "era::history::export",
98        first = config.first_block_number,
99        last = last_block_number,
100        max_blocks_per_file = config.max_blocks_per_file,
101        "Preparing era1 export data"
102    );
103
104    if !config.dir.exists() {
105        fs::create_dir_all(&config.dir)
106            .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
107    }
108
109    let start_time = Instant::now();
110    let mut last_report_time = Instant::now();
111    let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
112
113    let mut created_files = Vec::new();
114    let mut total_blocks_processed = 0;
115
116    let mut total_difficulty = if config.first_block_number > 0 {
117        let prev_block_number = config.first_block_number - 1;
118        calculate_td_by_number(provider, prev_block_number)?
119    } else {
120        U256::ZERO
121    };
122
123    // Process blocks in chunks according to `max_blocks_per_file`
124    for start_block in
125        (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
126    {
127        let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
128        let block_count = (end_block - start_block + 1) as usize;
129
130        info!(
131            target: "era::history::export",
132            "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
133        );
134
135        let headers = provider.headers_range(start_block..=end_block)?;
136
137        // Extract first 4 bytes of last block's state root as historical identifier
138        let historical_root = headers
139            .last()
140            .map(|header| {
141                let state_root = header.state_root();
142                [state_root[0], state_root[1], state_root[2], state_root[3]]
143            })
144            .unwrap_or([0u8; 4]);
145
146        let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
147            .with_hash(historical_root);
148
149        debug!("Final file name {}", era1_id.to_file_name());
150        let file_path = config.dir.join(era1_id.to_file_name());
151        let file = std::fs::File::create(&file_path)?;
152        let mut writer = Era1Writer::new(file);
153        writer.write_version()?;
154
155        let mut offsets = Vec::<i64>::with_capacity(block_count);
156        let mut position = VERSION_ENTRY_SIZE as i64;
157        let mut blocks_written = 0;
158        let mut final_header_data = Vec::new();
159
160        for (i, header) in headers.into_iter().enumerate() {
161            let expected_block_number = start_block + i as u64;
162
163            let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
164                provider,
165                header,
166                expected_block_number,
167                &mut total_difficulty,
168            )?;
169
170            // Save last block's header data for accumulator
171            if expected_block_number == end_block {
172                final_header_data = compressed_header.data.clone();
173            }
174
175            let difficulty = TotalDifficulty::new(total_difficulty);
176
177            let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
178            let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
179            let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
180            let difficulty_size = 32 + ENTRY_HEADER_SIZE; // U256 is 32 + 8 bytes header overhead
181            let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
182
183            let block_tuple = BlockTuple::new(
184                compressed_header,
185                compressed_body,
186                compressed_receipts,
187                difficulty,
188            );
189
190            offsets.push(position);
191            position += total_size;
192
193            writer.write_block(&block_tuple)?;
194            blocks_written += 1;
195            total_blocks_processed += 1;
196
197            if last_report_time.elapsed() >= report_interval {
198                info!(
199                    target: "era::history::export",
200                    "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
201                    (total_blocks_processed as f64) /
202                        ((last_block_number - config.first_block_number + 1) as f64) *
203                        100.0,
204                    start_time.elapsed()
205                );
206                last_report_time = Instant::now();
207            }
208        }
209        if blocks_written > 0 {
210            let accumulator_hash =
211                B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
212            let accumulator = Accumulator::new(accumulator_hash);
213            let block_index = BlockIndex::new(start_block, offsets);
214
215            writer.write_accumulator(&accumulator)?;
216            writer.write_block_index(&block_index)?;
217            writer.flush()?;
218            created_files.push(file_path.clone());
219
220            info!(
221                target: "era::history::export",
222                "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
223            );
224        }
225    }
226
227    info!(
228        target: "era::history::export",
229        "Successfully wrote {} ERA1 files in {:?}",
230        created_files.len(),
231        start_time.elapsed()
232    );
233
234    Ok(created_files)
235}
236
237// Determines the actual last block number that can be exported,
238// Uses `headers_range` fallback when `best_block_number` is stale due to static file storage.
239fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
240where
241    P: HeaderProvider + BlockNumReader,
242{
243    let best_block_number = provider.best_block_number()?;
244
245    let last_block_number = if best_block_number < config.last_block_number {
246        warn!(
247            "Last block {} is beyond current head {}, setting last = head",
248            config.last_block_number, best_block_number
249        );
250
251        // Check if more blocks are actually available beyond what `best_block_number()` reports
252        if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
253            if let Some(last_header) = headers.last() {
254                let highest_block = last_header.number();
255                info!("Found highest available block {} via headers_range", highest_block);
256                highest_block
257            } else {
258                warn!("No headers found in range, using best_block_number {}", best_block_number);
259                best_block_number
260            }
261        } else {
262            warn!("headers_range failed, using best_block_number {}", best_block_number);
263            best_block_number
264        }
265    } else {
266        config.last_block_number
267    };
268
269    Ok(last_block_number)
270}
271
272// Compresses block data and returns compressed components with metadata
273fn compress_block_data<P>(
274    provider: &P,
275    header: P::Header,
276    expected_block_number: BlockNumber,
277    total_difficulty: &mut U256,
278) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
279where
280    P: BlockReader,
281{
282    let actual_block_number = header.number();
283
284    if expected_block_number != actual_block_number {
285        return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
286    }
287
288    let body = provider
289        .block_by_number(actual_block_number)?
290        .ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
291
292    let receipts = provider
293        .receipts_by_block(actual_block_number.into())?
294        .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
295
296    *total_difficulty += header.difficulty();
297
298    let compressed_header = CompressedHeader::from_header(&header)?;
299    let compressed_body = CompressedBody::from_body(&body)?;
300    let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
301        .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
302
303    Ok((compressed_header, compressed_body, compressed_receipts))
304}
305
306#[cfg(test)]
307mod tests {
308    use crate::ExportConfig;
309    use reth_era::execution_types::MAX_BLOCKS_PER_ERA1;
310    use tempfile::tempdir;
311
312    #[test]
313    fn test_export_config_validation() {
314        let temp_dir = tempdir().unwrap();
315
316        // Default config should pass
317        let default_config = ExportConfig::default();
318        assert!(default_config.validate().is_ok(), "Default config should be valid");
319
320        // Exactly at the limit should pass
321        let limit_config =
322            ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
323        assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
324
325        // Valid config should pass
326        let valid_config = ExportConfig {
327            dir: temp_dir.path().to_path_buf(),
328            max_blocks_per_file: 1000,
329            ..Default::default()
330        };
331        assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
332
333        // Zero blocks per file should fail
334        let zero_blocks_config = ExportConfig {
335            max_blocks_per_file: 0, // Invalid
336            ..Default::default()
337        };
338        let result = zero_blocks_config.validate();
339        assert!(result.is_err(), "Zero blocks per file should fail validation");
340        assert!(result.unwrap_err().to_string().contains("cannot be zero"));
341
342        // Exceeding era1 limit should fail
343        let oversized_config = ExportConfig {
344            max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, // Invalid
345            ..Default::default()
346        };
347        let result = oversized_config.validate();
348        assert!(result.is_err(), "Oversized blocks per file should fail validation");
349        assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
350    }
351}