1use crate::calculate_td_by_number;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockNumber, B256, U256};
7use eyre::{eyre, Result};
8use reth_era::{
9 common::file_ops::{EraFileId, StreamWriter},
10 e2s::types::IndexEntry,
11 era1::{
12 file::Era1Writer,
13 types::{
14 execution::{
15 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
16 TotalDifficulty, MAX_BLOCKS_PER_ERA1,
17 },
18 group::{BlockIndex, Era1Id},
19 },
20 },
21};
22use reth_fs_util as fs;
23use reth_primitives_traits::Block;
24use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
25use std::{
26 path::PathBuf,
27 time::{Duration, Instant},
28};
29use tracing::{debug, info, warn};
30
31const REPORT_INTERVAL_SECS: u64 = 10;
32const ENTRY_HEADER_SIZE: usize = 8;
33const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
34
35#[derive(Clone, Debug)]
38pub struct ExportConfig {
39 pub dir: PathBuf,
41 pub first_block_number: BlockNumber,
43 pub last_block_number: BlockNumber,
45 pub max_blocks_per_file: u64,
49 pub network: String,
51}
52
53impl Default for ExportConfig {
54 fn default() -> Self {
55 Self {
56 dir: PathBuf::new(),
57 first_block_number: 0,
58 last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
59 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
60 network: "mainnet".to_string(),
61 }
62 }
63}
64
65impl ExportConfig {
66 pub fn validate(&self) -> Result<()> {
68 if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
69 return Err(eyre!(
70 "Max blocks per file ({}) exceeds ERA1 limit ({})",
71 self.max_blocks_per_file,
72 MAX_BLOCKS_PER_ERA1
73 ));
74 }
75
76 if self.max_blocks_per_file == 0 {
77 return Err(eyre!("Max blocks per file cannot be zero"));
78 }
79
80 Ok(())
81 }
82}
83
84pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
88where
89 P: BlockReader,
90{
91 config.validate()?;
92 info!(
93 "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
94 config.first_block_number, config.last_block_number, config.max_blocks_per_file
95 );
96
97 let last_block_number = determine_export_range(provider, config)?;
100
101 info!(
102 target: "era::history::export",
103 first = config.first_block_number,
104 last = last_block_number,
105 max_blocks_per_file = config.max_blocks_per_file,
106 "Preparing era1 export data"
107 );
108
109 if !config.dir.exists() {
110 fs::create_dir_all(&config.dir)
111 .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
112 }
113
114 let start_time = Instant::now();
115 let mut last_report_time = Instant::now();
116 let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
117
118 let mut created_files = Vec::new();
119 let mut total_blocks_processed = 0;
120
121 let mut total_difficulty = if config.first_block_number > 0 {
122 let prev_block_number = config.first_block_number - 1;
123 calculate_td_by_number(provider, prev_block_number)?
124 } else {
125 U256::ZERO
126 };
127
128 for start_block in
130 (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
131 {
132 let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
133 let block_count = (end_block - start_block + 1) as usize;
134
135 info!(
136 target: "era::history::export",
137 "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
138 );
139
140 let headers = provider.headers_range(start_block..=end_block)?;
141
142 let historical_root = headers
144 .last()
145 .map(|header| {
146 let state_root = header.state_root();
147 [state_root[0], state_root[1], state_root[2], state_root[3]]
148 })
149 .unwrap_or([0u8; 4]);
150
151 let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
152 .with_hash(historical_root);
153
154 let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
155 era1_id
156 } else {
157 era1_id.with_era_count()
158 };
159
160 debug!("Final file name {}", era1_id.to_file_name());
161 let file_path = config.dir.join(era1_id.to_file_name());
162 let file = std::fs::File::create(&file_path)?;
163 let mut writer = Era1Writer::new(file);
164 writer.write_version()?;
165
166 let mut offsets = Vec::<i64>::with_capacity(block_count);
167 let mut position = VERSION_ENTRY_SIZE as i64;
168 let mut blocks_written = 0;
169 let mut final_header_data = Vec::new();
170
171 for (i, header) in headers.into_iter().enumerate() {
172 let expected_block_number = start_block + i as u64;
173
174 let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
175 provider,
176 header,
177 expected_block_number,
178 &mut total_difficulty,
179 )?;
180
181 if expected_block_number == end_block {
183 final_header_data = compressed_header.data.clone();
184 }
185
186 let difficulty = TotalDifficulty::new(total_difficulty);
187
188 let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
189 let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
190 let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
191 let difficulty_size = 32 + ENTRY_HEADER_SIZE; let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
193
194 let block_tuple = BlockTuple::new(
195 compressed_header,
196 compressed_body,
197 compressed_receipts,
198 difficulty,
199 );
200
201 offsets.push(position);
202 position += total_size;
203
204 writer.write_block(&block_tuple)?;
205 blocks_written += 1;
206 total_blocks_processed += 1;
207
208 if last_report_time.elapsed() >= report_interval {
209 info!(
210 target: "era::history::export",
211 "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
212 (total_blocks_processed as f64) /
213 ((last_block_number - config.first_block_number + 1) as f64) *
214 100.0,
215 start_time.elapsed()
216 );
217 last_report_time = Instant::now();
218 }
219 }
220 if blocks_written > 0 {
221 let accumulator_hash =
222 B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
223 let accumulator = Accumulator::new(accumulator_hash);
224 let block_index = BlockIndex::new(start_block, offsets);
225
226 writer.write_accumulator(&accumulator)?;
227 writer.write_block_index(&block_index)?;
228 writer.flush()?;
229
230 info!(
231 target: "era::history::export",
232 "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
233 );
234 created_files.push(file_path);
235 }
236 }
237
238 info!(
239 target: "era::history::export",
240 "Successfully wrote {} ERA1 files in {:?}",
241 created_files.len(),
242 start_time.elapsed()
243 );
244
245 Ok(created_files)
246}
247
248fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
251where
252 P: HeaderProvider + BlockNumReader,
253{
254 let best_block_number = provider.best_block_number()?;
255
256 let last_block_number = if best_block_number < config.last_block_number {
257 warn!(
258 "Last block {} is beyond current head {}, setting last = head",
259 config.last_block_number, best_block_number
260 );
261
262 if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
264 if let Some(last_header) = headers.last() {
265 let highest_block = last_header.number();
266 info!("Found highest available block {} via headers_range", highest_block);
267 highest_block
268 } else {
269 warn!("No headers found in range, using best_block_number {}", best_block_number);
270 best_block_number
271 }
272 } else {
273 warn!("headers_range failed, using best_block_number {}", best_block_number);
274 best_block_number
275 }
276 } else {
277 config.last_block_number
278 };
279
280 Ok(last_block_number)
281}
282
283fn compress_block_data<P>(
285 provider: &P,
286 header: P::Header,
287 expected_block_number: BlockNumber,
288 total_difficulty: &mut U256,
289) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
290where
291 P: BlockReader,
292{
293 let actual_block_number = header.number();
294
295 if expected_block_number != actual_block_number {
296 return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
297 }
298
299 let body = provider
301 .block_by_number(actual_block_number)?
302 .ok_or_else(|| eyre!("Block not found for block {}", actual_block_number))?
303 .into_body();
304
305 let receipts = provider
306 .receipts_by_block(actual_block_number.into())?
307 .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
308
309 *total_difficulty += header.difficulty();
310
311 let compressed_header = CompressedHeader::from_header(&header)?;
312 let compressed_body = CompressedBody::from_body(&body)?;
313 let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
314 .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
315
316 Ok((compressed_header, compressed_body, compressed_receipts))
317}
318
319#[cfg(test)]
320mod tests {
321 use crate::ExportConfig;
322 use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
323 use tempfile::tempdir;
324
325 #[test]
326 fn test_export_config_validation() {
327 let temp_dir = tempdir().unwrap();
328
329 let default_config = ExportConfig::default();
331 assert!(default_config.validate().is_ok(), "Default config should be valid");
332
333 let limit_config =
335 ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
336 assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
337
338 let valid_config = ExportConfig {
340 dir: temp_dir.path().to_path_buf(),
341 max_blocks_per_file: 1000,
342 ..Default::default()
343 };
344 assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
345
346 let zero_blocks_config = ExportConfig {
348 max_blocks_per_file: 0, ..Default::default()
350 };
351 let result = zero_blocks_config.validate();
352 assert!(result.is_err(), "Zero blocks per file should fail validation");
353 assert!(result.unwrap_err().to_string().contains("cannot be zero"));
354
355 let oversized_config = ExportConfig {
357 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, ..Default::default()
359 };
360 let result = oversized_config.validate();
361 assert!(result.is_err(), "Oversized blocks per file should fail validation");
362 assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
363 }
364}