1mod era1;
11mod ere;
12
13use crate::calculate_td_by_number;
14use alloy_consensus::{BlockHeader, Sealable};
15use alloy_primitives::{BlockNumber, B256, U256};
16use alloy_rlp::Encodable;
17use eyre::{eyre, Result};
18use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
19use reth_fs_util as fs;
20use reth_primitives_traits::{Block, Receipt};
21use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider, ReceiptProvider};
22use std::{
23 path::{Path, PathBuf},
24 time::{Duration, Instant},
25};
26use tracing::{info, warn};
27
28const REPORT_INTERVAL_SECS: u64 = 10;
31
32#[derive(Clone, Debug)]
37pub struct ExportConfig {
38 pub dir: PathBuf,
40 pub first_block_number: BlockNumber,
42 pub last_block_number: BlockNumber,
44 pub max_blocks_per_file: u64,
48 pub network: String,
50}
51
52impl Default for ExportConfig {
53 fn default() -> Self {
54 Self {
55 dir: PathBuf::new(),
56 first_block_number: 0,
57 last_block_number: (MAX_BLOCKS_PER_ERA1 - 1) as u64,
58 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64,
59 network: "mainnet".to_string(),
60 }
61 }
62}
63
64impl ExportConfig {
65 pub fn validate(&self) -> Result<()> {
67 if self.max_blocks_per_file > MAX_BLOCKS_PER_ERA1 as u64 {
68 return Err(eyre!(
69 "Max blocks per file ({}) exceeds ERA1 limit ({})",
70 self.max_blocks_per_file,
71 MAX_BLOCKS_PER_ERA1
72 ));
73 }
74
75 if self.max_blocks_per_file == 0 {
76 return Err(eyre!("Max blocks per file cannot be zero"));
77 }
78
79 Ok(())
80 }
81}
82
83#[derive(Debug)]
85pub struct ExportBlock<H, B, R> {
86 pub header: H,
88 pub block_hash: B256,
90 pub body: B,
92 pub receipts: Vec<R>,
95 pub total_difficulty: U256,
97}
98
99impl<H, B, R> ExportBlock<H, B, R> {
100 pub const fn header_record(&self) -> (B256, U256) {
106 (self.block_hash, self.total_difficulty)
107 }
108}
109
110pub trait EraBlockWriter {
115 fn write_file<H, B, R>(
120 network: &str,
121 max_blocks_per_file: u64,
122 blocks: &[ExportBlock<H, B, R>],
123 dir: &Path,
124 ) -> Result<PathBuf>
125 where
126 H: BlockHeader + Encodable,
127 B: Encodable,
128 R: Receipt;
129}
130
131pub(crate) trait ChunkAccumulator: Sized {
137 fn from_pairs(records: &[(B256, U256)]) -> Result<Self>;
139}
140
141fn accumulator<A, H, B, R>(blocks: &[ExportBlock<H, B, R>]) -> Result<A>
144where
145 A: ChunkAccumulator,
146{
147 let records: Vec<(B256, U256)> = blocks.iter().map(ExportBlock::header_record).collect();
148 A::from_pairs(&records)
149}
150
151type Chunk<P> =
153 Vec<ExportBlock<<P as HeaderProvider>::Header, BodyOf<P>, <P as ReceiptProvider>::Receipt>>;
154
155type BodyOf<P> = <<P as BlockReader>::Block as Block>::Body;
157
158pub fn export<W, P>(provider: &P, config: &ExportConfig) -> Result<Vec<PathBuf>>
163where
164 W: EraBlockWriter,
165 P: BlockReader,
166 P::Header: BlockHeader + Sealable + Encodable,
167 BodyOf<P>: Encodable,
168 P::Receipt: Receipt,
169{
170 config.validate()?;
171
172 let last_block = determine_export_range(provider, config)?;
175
176 info!(
177 target: "era::history::export",
178 first = config.first_block_number,
179 last = last_block,
180 max_blocks_per_file = config.max_blocks_per_file,
181 "Preparing ERA export data"
182 );
183
184 if !config.dir.exists() {
185 fs::create_dir_all(&config.dir)
186 .map_err(|e| eyre!("Failed to create output directory: {}", e))?;
187 }
188
189 let mut progress = ExportProgress::new(last_block - config.first_block_number + 1);
190 let mut total_difficulty = seed_total_difficulty(provider, config)?;
191 let mut created_files = Vec::new();
192
193 for start_block in
194 (config.first_block_number..=last_block).step_by(config.max_blocks_per_file as usize)
195 {
196 let end_block = (start_block + config.max_blocks_per_file - 1).min(last_block);
197
198 let blocks = gather_chunk(
199 provider,
200 start_block..=end_block,
201 last_block,
202 &mut total_difficulty,
203 &mut progress,
204 )?;
205 if blocks.is_empty() {
206 continue;
207 }
208
209 let file_path =
210 W::write_file(&config.network, config.max_blocks_per_file, &blocks, &config.dir)?;
211
212 info!(target: "era::history::export", "Wrote ERA file: {file_path:?} with {} blocks", blocks.len());
213 created_files.push(file_path);
214 }
215
216 info!(
217 target: "era::history::export",
218 "Successfully wrote {} ERA files in {:?}",
219 created_files.len(),
220 progress.elapsed()
221 );
222
223 Ok(created_files)
224}
225
226fn short_hash(root: B256) -> [u8; 4] {
228 root[..4].try_into().expect("root is 32 bytes")
229}
230
231fn seed_total_difficulty<P: BlockReader>(provider: &P, config: &ExportConfig) -> Result<U256> {
234 if config.first_block_number > 0 {
235 calculate_td_by_number(provider, config.first_block_number - 1)
236 } else {
237 Ok(U256::ZERO)
238 }
239}
240
241fn gather_chunk<P>(
244 provider: &P,
245 range: std::ops::RangeInclusive<BlockNumber>,
246 last_block: BlockNumber,
247 total_difficulty: &mut U256,
248 progress: &mut ExportProgress,
249) -> Result<Chunk<P>>
250where
251 P: BlockReader,
252 P::Header: BlockHeader + Sealable,
253{
254 let start_block = *range.start();
255 let headers = provider.headers_range(range)?;
256 let mut blocks = Vec::with_capacity(headers.len());
257
258 for (i, header) in headers.into_iter().enumerate() {
259 let expected = start_block + i as u64;
260 let actual = header.number();
261 if expected != actual {
262 return Err(eyre!("Expected block {expected}, got {actual}"));
263 }
264
265 let body = provider
267 .block_by_number(actual)?
268 .ok_or_else(|| eyre!("Block not found for block {actual}"))?
269 .into_body();
270 let receipts = provider
271 .receipts_by_block(actual.into())?
272 .ok_or_else(|| eyre!("Receipts not found for block {actual}"))?;
273
274 let block_hash = header.hash_slow();
275 *total_difficulty += header.difficulty();
276
277 blocks.push(ExportBlock {
278 header,
279 block_hash,
280 body,
281 receipts,
282 total_difficulty: *total_difficulty,
283 });
284 progress.record(actual, last_block);
285 }
286
287 Ok(blocks)
288}
289
290fn determine_export_range<P>(provider: &P, config: &ExportConfig) -> Result<BlockNumber>
294where
295 P: HeaderProvider + BlockNumReader,
296{
297 let best_block_number = provider.best_block_number()?;
298
299 if best_block_number >= config.last_block_number {
300 return Ok(config.last_block_number);
301 }
302
303 warn!(
304 "Last block {} is beyond current head {}, setting last = head",
305 config.last_block_number, best_block_number
306 );
307
308 match provider.headers_range(best_block_number..=config.last_block_number) {
310 Ok(headers) => match headers.last() {
311 Some(last_header) => {
312 let highest_block = last_header.number();
313 info!("Found highest available block {} via headers_range", highest_block);
314 Ok(highest_block)
315 }
316 None => {
317 warn!("No headers found in range, using best_block_number {}", best_block_number);
318 Ok(best_block_number)
319 }
320 },
321 Err(_) => {
322 warn!("headers_range failed, using best_block_number {}", best_block_number);
323 Ok(best_block_number)
324 }
325 }
326}
327
328struct ExportProgress {
330 start: Instant,
331 last_report: Instant,
332 interval: Duration,
333 total_blocks: u64,
334 processed: u64,
335}
336
337impl ExportProgress {
338 fn new(total_blocks: u64) -> Self {
339 let now = Instant::now();
340 Self {
341 start: now,
342 last_report: now,
343 interval: Duration::from_secs(REPORT_INTERVAL_SECS),
344 total_blocks,
345 processed: 0,
346 }
347 }
348
349 fn record(&mut self, current_block: BlockNumber, last_block: BlockNumber) {
351 self.processed += 1;
352 if self.last_report.elapsed() >= self.interval {
353 info!(
354 target: "era::history::export",
355 "Export progress: block {current_block}/{last_block} ({:.2}%) - elapsed: {:?}",
356 self.processed as f64 / self.total_blocks as f64 * 100.0,
357 self.start.elapsed()
358 );
359 self.last_report = Instant::now();
360 }
361 }
362
363 fn elapsed(&self) -> Duration {
364 self.start.elapsed()
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::ExportConfig;
371 use reth_era::era1::types::execution::MAX_BLOCKS_PER_ERA1;
372 use tempfile::tempdir;
373
374 #[test]
375 fn test_export_config_validation() {
376 let temp_dir = tempdir().unwrap();
377
378 let default_config = ExportConfig::default();
380 assert!(default_config.validate().is_ok(), "Default config should be valid");
381
382 let limit_config =
384 ExportConfig { max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64, ..Default::default() };
385 assert!(limit_config.validate().is_ok(), "Config at ERA1 limit should pass validation");
386
387 let valid_config = ExportConfig {
389 dir: temp_dir.path().to_path_buf(),
390 max_blocks_per_file: 1000,
391 ..Default::default()
392 };
393 assert!(valid_config.validate().is_ok(), "Valid config should pass validation");
394
395 let zero_blocks_config = ExportConfig {
397 max_blocks_per_file: 0, ..Default::default()
399 };
400 let result = zero_blocks_config.validate();
401 assert!(result.is_err(), "Zero blocks per file should fail validation");
402 assert!(result.unwrap_err().to_string().contains("cannot be zero"));
403
404 let oversized_config = ExportConfig {
406 max_blocks_per_file: MAX_BLOCKS_PER_ERA1 as u64 + 1, ..Default::default()
408 };
409 let result = oversized_config.validate();
410 assert!(result.is_err(), "Oversized blocks per file should fail validation");
411 assert!(result.unwrap_err().to_string().contains("exceeds ERA1 limit"));
412 }
413}