1use crate::calculate_td_by_number;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockNumber, B256, U256};
7use eyre::{eyre, Result};
8use reth_era::{
9 common::file_ops::{EraFileId, StreamWriter},
10 e2s::types::IndexEntry,
11 era1::{
12 file::Era1Writer,
13 types::{
14 execution::{
15 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
16 TotalDifficulty, MAX_BLOCKS_PER_ERA1,
17 },
18 group::{BlockIndex, Era1Id},
19 },
20 },
21};
22use reth_fs_util as fs;
23use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
24use std::{
25 path::PathBuf,
26 time::{Duration, Instant},
27};
28use tracing::{debug, info, warn};
29
30const REPORT_INTERVAL_SECS: u64 = 10;
31const ENTRY_HEADER_SIZE: usize = 8;
32const VERSION_ENTRY_SIZE: usize = ENTRY_HEADER_SIZE;
33
34#[derive(Clone, Debug)]
37pub struct ExportConfig {
38 pub dir: PathBuf,
40 pub first_block_number: BlockNumber,
42 pub last_block_number: BlockNumber,
44 pub max_blocks_per_file: u64,
48 pub network: String,
50}
51
52impl Default for ExportConfig {
53 fn default() -> Self {
54 Self {
55 dir: PathBuf::new(),
56 first_block_number: 0,
57 last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
58 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
59 network: "mainnet".to_string(),
60 }
61 }
62}
63
64impl ExportConfig {
65 pub fn validate(&self) -> Result<()> {
67 if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
68 return Err(eyre!(
69 "Max blocks per file ({}) exceeds ERA1 limit ({})",
70 self.max_blocks_per_file,
71 MAX_BLOCKS_PER_ERA1
72 ));
73 }
74
75 if self.max_blocks_per_file == 0 {
76 return Err(eyre!("Max blocks per file cannot be zero"));
77 }
78
79 Ok(())
80 }
81}
82
83pub fn export<P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
87where
88 P: BlockReader,
89{
90 config.validate()?;
91 info!(
92 "Exporting blockchain history from block {} to {} with this max of blocks per file of {}",
93 config.first_block_number, config.last_block_number, config.max_blocks_per_file
94 );
95
96 let last_block_number = determine_export_range(provider, config)?;
99
100 info!(
101 target: "era::history::export",
102 first = config.first_block_number,
103 last = last_block_number,
104 max_blocks_per_file = config.max_blocks_per_file,
105 "Preparing era1 export data"
106 );
107
108 if !config.dir.exists() {
109 fs::create_dir_all(&config.dir)
110 .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
111 }
112
113 let start_time = Instant::now();
114 let mut last_report_time = Instant::now();
115 let report_interval = Duration::from_secs(REPORT_INTERVAL_SECS);
116
117 let mut created_files = Vec::new();
118 let mut total_blocks_processed = 0;
119
120 let mut total_difficulty = if config.first_block_number > 0 {
121 let prev_block_number = config.first_block_number - 1;
122 calculate_td_by_number(provider, prev_block_number)?
123 } else {
124 U256::ZERO
125 };
126
127 for start_block in
129 (config.first_block_number..=last_block_number).step_by(config.max_blocks_per_file as usize)
130 {
131 let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block_number);
132 let block_count = (end_block - start_block + 1) as usize;
133
134 info!(
135 target: "era::history::export",
136 "Processing blocks {start_block} to {end_block} ({block_count} blocks)"
137 );
138
139 let headers = provider.headers_range(start_block..=end_block)?;
140
141 let historical_root = headers
143 .last()
144 .map(|header| {
145 let state_root = header.state_root();
146 [state_root[0], state_root[1], state_root[2], state_root[3]]
147 })
148 .unwrap_or([0u8; 4]);
149
150 let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
151 .with_hash(historical_root);
152
153 let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
154 era1_id
155 } else {
156 era1_id.with_era_count()
157 };
158
159 debug!("Final file name {}", era1_id.to_file_name());
160 let file_path = config.dir.join(era1_id.to_file_name());
161 let file = std::fs::File::create(&file_path)?;
162 let mut writer = Era1Writer::new(file);
163 writer.write_version()?;
164
165 let mut offsets = Vec::<i64>::with_capacity(block_count);
166 let mut position = VERSION_ENTRY_SIZE as i64;
167 let mut blocks_written = 0;
168 let mut final_header_data = Vec::new();
169
170 for (i, header) in headers.into_iter().enumerate() {
171 let expected_block_number = start_block + i as u64;
172
173 let (compressed_header, compressed_body, compressed_receipts) = compress_block_data(
174 provider,
175 header,
176 expected_block_number,
177 &mut total_difficulty,
178 )?;
179
180 if expected_block_number == end_block {
182 final_header_data = compressed_header.data.clone();
183 }
184
185 let difficulty = TotalDifficulty::new(total_difficulty);
186
187 let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
188 let body_size = compressed_body.data.len() + ENTRY_HEADER_SIZE;
189 let receipts_size = compressed_receipts.data.len() + ENTRY_HEADER_SIZE;
190 let difficulty_size = 32 + ENTRY_HEADER_SIZE; let total_size = (header_size + body_size + receipts_size + difficulty_size) as i64;
192
193 let block_tuple = BlockTuple::new(
194 compressed_header,
195 compressed_body,
196 compressed_receipts,
197 difficulty,
198 );
199
200 offsets.push(position);
201 position += total_size;
202
203 writer.write_block(&block_tuple)?;
204 blocks_written += 1;
205 total_blocks_processed += 1;
206
207 if last_report_time.elapsed() >= report_interval {
208 info!(
209 target: "era::history::export",
210 "Export progress: block {expected_block_number}/{last_block_number} ({:.2}%) - elapsed: {:?}",
211 (total_blocks_processed as f64) /
212 ((last_block_number - config.first_block_number + 1) as f64) *
213 100.0,
214 start_time.elapsed()
215 );
216 last_report_time = Instant::now();
217 }
218 }
219 if blocks_written > 0 {
220 let accumulator_hash =
221 B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
222 let accumulator = Accumulator::new(accumulator_hash);
223 let block_index = BlockIndex::new(start_block, offsets);
224
225 writer.write_accumulator(&accumulator)?;
226 writer.write_block_index(&block_index)?;
227 writer.flush()?;
228
229 info!(
230 target: "era::history::export",
231 "Wrote ERA1 file: {file_path:?} with {blocks_written} blocks"
232 );
233 created_files.push(file_path);
234 }
235 }
236
237 info!(
238 target: "era::history::export",
239 "Successfully wrote {} ERA1 files in {:?}",
240 created_files.len(),
241 start_time.elapsed()
242 );
243
244 Ok(created_files)
245}
246
247fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
250where
251 P: HeaderProvider + BlockNumReader,
252{
253 let best_block_number = provider.best_block_number()?;
254
255 let last_block_number = if best_block_number < config.last_block_number {
256 warn!(
257 "Last block {} is beyond current head {}, setting last = head",
258 config.last_block_number, best_block_number
259 );
260
261 if let Ok(headers) = provider.headers_range(best_block_number..=config.last_block_number) {
263 if let Some(last_header) = headers.last() {
264 let highest_block = last_header.number();
265 info!("Found highest available block {} via headers_range", highest_block);
266 highest_block
267 } else {
268 warn!("No headers found in range, using best_block_number {}", best_block_number);
269 best_block_number
270 }
271 } else {
272 warn!("headers_range failed, using best_block_number {}", best_block_number);
273 best_block_number
274 }
275 } else {
276 config.last_block_number
277 };
278
279 Ok(last_block_number)
280}
281
282fn compress_block_data<P>(
284 provider: &P,
285 header: P::Header,
286 expected_block_number: BlockNumber,
287 total_difficulty: &mut U256,
288) -> Result<(CompressedHeader, CompressedBody, CompressedReceipts)>
289where
290 P: BlockReader,
291{
292 let actual_block_number = header.number();
293
294 if expected_block_number != actual_block_number {
295 return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
296 }
297
298 let body = provider
299 .block_by_number(actual_block_number)?
300 .ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
301
302 let receipts = provider
303 .receipts_by_block(actual_block_number.into())?
304 .ok_or_else(|| eyre!("Receipts not found for block {}", actual_block_number))?;
305
306 *total_difficulty += header.difficulty();
307
308 let compressed_header = CompressedHeader::from_header(&header)?;
309 let compressed_body = CompressedBody::from_body(&body)?;
310 let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
311 .map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
312
313 Ok((compressed_header, compressed_body, compressed_receipts))
314}
315
316#[cfg(test)]
317mod tests {
318 use crate::ExportConfig;
319 use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
320 use tempfile::tempdir;
321
322 #[test]
323 fn test_export_config_validation() {
324 let temp_dir = tempdir().unwrap();
325
326 let default_config = ExportConfig::default();
328 assert!(default_config.validate().is_ok(), "Default config should be valid");
329
330 let limit_config =
332 ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
333 assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
334
335 let valid_config = ExportConfig {
337 dir: temp_dir.path().to_path_buf(),
338 max_blocks_per_file: 1000,
339 ..Default::default()
340 };
341 assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
342
343 let zero_blocks_config = ExportConfig {
345 max_blocks_per_file: 0, ..Default::default()
347 };
348 let result = zero_blocks_config.validate();
349 assert!(result.is_err(), "Zero blocks per file should fail validation");
350 assert!(result.unwrap_err().to_string().contains("cannot be zero"));
351
352 let oversized_config = ExportConfig {
354 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, ..Default::default()
356 };
357 let result = oversized_config.validate();
358 assert!(result.is_err(), "Oversized blocks per file should fail validation");
359 assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
360 }
361}