1use alloy_consensus::BlockHeader;
5use alloy_primitives::{BlockNumber, B256, U256};
6use eyre::{eyre, Result};
7use reth_era::{
8 e2s_types::IndexEntry,
9 era1_file::Era1Writer,
10 era1_types::{BlockIndex, Era1Id},
11 era_file_ops::{EraFileId, StreamWriter},
12 execution_types::{
13 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
14 TotalDifficulty, MAX_BLOCKS_PER_ERA1,
15 },
16};
17use reth_fs_util as fs;
18use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
19use std::{
20 path::PathBuf,
21 time::{Duration, Instant},
22};
23use tracing::{debug, info, warn};
24
25const REPORT_INTERVAL_SECS: u64 = 10;
26const ENTRY_HEADER_SIZE: usize = 8;
27const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
28
29#[derive(Clone, Debug)]
32pub struct ExportConfig {
33 pub dir: PathBuf,
35 pub first_block_number: BlockNumber,
37 pub last_block_number: BlockNumber,
39 pub max_blocks_per_file: u64,
43 pub network: String,
45}
46
47impl Default for ExportConfig {
48 fn default() -> Self {
49 Self {
50 dir: PathBuf::new(),
51 first_block_number: 0,
52 last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
53 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
54 network: "mainnet".to_string(),
55 }
56 }
57}
58
59impl ExportConfig {
60 pub fn validate(&self) -> Result<()> {
62 if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
63 return Err(eyre!(
64 "Max blocks per file ({}) exceeds ERA1 limit ({})",
65 self.max_blocks_per_file,
66 MAX_BLOCKS_PER_ERA1
67 ));
68 }
69
70 if self.max_blocks_per_file == 0 {
71 return Err(eyre!("Max blocks per file cannot be zero"));
72 }
73
74 Ok(())
75 }
76}
77
78pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
82where
83 P: BlockReader,
84{
85 config.validate()?;
86 info!(
87 "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
88 config.first_block_number, config.last_block_number, config.max_blocks_per_file
89 );
90
91 let last_block_number = determine_export_range(provider, config)?;
94
95 info!(
96 target: "era::history::export",
97 first = config.first_block_number,
98 last = last_block_number,
99 max_blocks_per_file = config.max_blocks_per_file,
100 "Preparing era1 export data"
101 );
102
103 if !config.dir.exists() {
104 fs::create_dir_all(&config.dir)
105 .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
106 }
107
108 let start_time = Instant::now();
109 let mut last_report_time = Instant::now();
110 let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
111
112 let mut created_files = Vec::new();
113 let mut total_blocks_processed = 0;
114
115 let mut total_difficulty = if config.first_block_number > 0 {
116 let prev_block_number = config.first_block_number - 1;
117 provider
118 .header_td_by_number(prev_block_number)?
119 .ok_or_else(|| eyre!("Total difficulty not found for block {prev_block_number}"))?
120 } else {
121 U256::ZERO
122 };
123
124 for start_block in
126 (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
127 {
128 let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
129 let block_count = (end_block - start_block + 1) as usize;
130
131 info!(
132 target: "era::history::export",
133 "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
134 );
135
136 let headers = provider.headers_range(start_block..=end_block)?;
137
138 let historical_root = headers
140 .last()
141 .map(|header| {
142 let state_root = header.state_root();
143 [state_root[0], state_root[1], state_root[2], state_root[3]]
144 })
145 .unwrap_or([0u8; 4]);
146
147 let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
148 .with_hash(historical_root);
149
150 debug!("Final file name {}", era1_id.to_file_name());
151 let file_path = config.dir.join(era1_id.to_file_name());
152 let file = std::fs::File::create(&file_path)?;
153 let mut writer = Era1Writer::new(file);
154 writer.write_version()?;
155
156 let mut offsets = Vec::<i64>::with_capacity(block_count);
157 let mut position = VERSION_ENTRY_SIZE as i64;
158 let mut blocks_written = 0;
159 let mut final_header_data = Vec::new();
160
161 for (i, header) in headers.into_iter().enumerate() {
162 let expected_block_number = start_block + i as u64;
163
164 let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
165 provider,
166 header,
167 expected_block_number,
168 &mut total_difficulty,
169 )?;
170
171 if expected_block_number == end_block {
173 final_header_data = compressed_header.data.clone();
174 }
175
176 let difficulty = TotalDifficulty::new(total_difficulty);
177
178 let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
179 let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
180 let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
181 let difficulty_size = 32 + ENTRY_HEADER_SIZE; let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
183
184 let block_tuple = BlockTuple::new(
185 compressed_header,
186 compressed_body,
187 compressed_receipts,
188 difficulty,
189 );
190
191 offsets.push(position);
192 position += total_size;
193
194 writer.write_block(&block_tuple)?;
195 blocks_written += 1;
196 total_blocks_processed += 1;
197
198 if last_report_time.elapsed() >= report_interval {
199 info!(
200 target: "era::history::export",
201 "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
202 (total_blocks_processed as f64) /
203 ((last_block_number - config.first_block_number + 1) as f64) *
204 100.0,
205 start_time.elapsed()
206 );
207 last_report_time = Instant::now();
208 }
209 }
210 if blocks_written > 0 {
211 let accumulator_hash =
212 B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
213 let accumulator = Accumulator::new(accumulator_hash);
214 let block_index = BlockIndex::new(start_block, offsets);
215
216 writer.write_accumulator(&accumulator)?;
217 writer.write_block_index(&block_index)?;
218 writer.flush()?;
219 created_files.push(file_path.clone());
220
221 info!(
222 target: "era::history::export",
223 "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
224 );
225 }
226 }
227
228 info!(
229 target: "era::history::export",
230 "Successfully wrote {} ERA1 files in {:?}",
231 created_files.len(),
232 start_time.elapsed()
233 );
234
235 Ok(created_files)
236}
237
238fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
241where
242 P: HeaderProvider + BlockNumReader,
243{
244 let best_block_number = provider.best_block_number()?;
245
246 let last_block_number = if best_block_number < config.last_block_number {
247 warn!(
248 "Last block {} is beyond current head {}, setting last = head",
249 config.last_block_number, best_block_number
250 );
251
252 if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
254 if let Some(last_header) = headers.last() {
255 let highest_block = last_header.number();
256 info!("Found highest available block {} via headers_range", highest_block);
257 highest_block
258 } else {
259 warn!("No headers found in range, using best_block_number {}", best_block_number);
260 best_block_number
261 }
262 } else {
263 warn!("headers_range failed, using best_block_number {}", best_block_number);
264 best_block_number
265 }
266 } else {
267 config.last_block_number
268 };
269
270 Ok(last_block_number)
271}
272
273fn compress_block_data<P>(
275 provider: &P,
276 header: P::Header,
277 expected_block_number: BlockNumber,
278 total_difficulty: &mut U256,
279) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
280where
281 P: BlockReader,
282{
283 let actual_block_number = header.number();
284
285 if expected_block_number != actual_block_number {
286 return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
287 }
288
289 let body = provider
290 .block_by_number(actual_block_number)?
291 .ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
292
293 let receipts = provider
294 .receipts_by_block(actual_block_number.into())?
295 .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
296
297 *total_difficulty += header.difficulty();
298
299 let compressed_header = CompressedHeader::from_header(&header)?;
300 let compressed_body = CompressedBody::from_body(&body)?;
301 let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
302 .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
303
304 Ok((compressed_header, compressed_body, compressed_receipts))
305}
306
307#[cfg(test)]
308mod tests {
309 use crate::ExportConfig;
310 use reth_era::execution_types::MAX_BLOCKS_PER_ERA1;
311 use tempfile::tempdir;
312
313 #[test]
314 fn test_export_config_validation() {
315 let temp_dir = tempdir().unwrap();
316
317 let default_config = ExportConfig::default();
319 assert!(default_config.validate().is_ok(), "Default config should be valid");
320
321 let limit_config =
323 ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
324 assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
325
326 let valid_config = ExportConfig {
328 dir: temp_dir.path().to_path_buf(),
329 max_blocks_per_file: 1000,
330 ..Default::default()
331 };
332 assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
333
334 let zero_blocks_config = ExportConfig {
336 max_blocks_per_file: 0, ..Default::default()
338 };
339 let result = zero_blocks_config.validate();
340 assert!(result.is_err(), "Zero blocks per file should fail validation");
341 assert!(result.unwrap_err().to_string().contains("cannot be zero"));
342
343 let oversized_config = ExportConfig {
345 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, ..Default::default()
347 };
348 let result = oversized_config.validate();
349 assert!(result.is_err(), "Oversized blocks per file should fail validation");
350 assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
351 }
352}