1use blake3::Hasher;
2use eyre::Result;
3use rayon::prelude::*;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::{
7 collections::BTreeMap,
8 io::Read,
9 path::{Path, PathBuf},
10};
11use tracing::info;
12
13fn is_zero(value: &u64) -> bool {
14 *value == 0
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct SnapshotManifest {
32 pub block: u64,
34 pub chain_id: u64,
36 pub storage_version: u64,
38 pub timestamp: u64,
40 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub base_url: Option<String>,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
47 pub reth_version: Option<String>,
48 pub components: BTreeMap<String, ComponentManifest>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(untagged)]
55pub enum ComponentManifest {
56 Single(SingleArchive),
58 Chunked(ChunkedArchive),
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct SingleArchive {
65 pub file: String,
67 pub size: u64,
69 #[serde(default, skip_serializing_if = "is_zero")]
74 pub decompressed_size: u64,
75 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub blake3: Option<String>,
78 #[serde(default)]
82 pub output_files: Vec<OutputFileChecksum>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ChunkedArchive {
88 pub blocks_per_file: u64,
90 pub total_blocks: u64,
92 #[serde(default)]
95 pub chunk_sizes: Vec<u64>,
96 #[serde(default, skip_serializing_if = "Vec::is_empty")]
101 pub chunk_decompressed_sizes: Vec<u64>,
102 #[serde(default)]
106 pub chunk_output_files: Vec<Vec<OutputFileChecksum>>,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct OutputFileChecksum {
112 pub path: String,
114 pub size: u64,
116 pub blake3: String,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct SnapshotArchive {
123 pub url: String,
124 pub file_name: String,
125 pub size: u64,
126 pub blake3: Option<String>,
127 pub output_files: Vec<OutputFileChecksum>,
128}
129
130impl SnapshotArchive {
131 pub fn output_size(&self) -> u64 {
133 self.output_files.iter().map(|file| file.size).sum()
134 }
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub enum ComponentSelection {
140 All,
142 Distance(u64),
145 Since(u64),
148 None,
151}
152
153impl std::fmt::Display for ComponentSelection {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 match self {
156 Self::All => write!(f, "All"),
157 Self::Distance(d) => write!(f, "Last {d} blocks"),
158 Self::Since(block) => write!(f, "Since block {block}"),
159 Self::None => write!(f, "None"),
160 }
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
166pub enum SnapshotComponentType {
167 State,
169 Headers,
171 Transactions,
173 TransactionSenders,
175 Receipts,
177 AccountChangesets,
179 StorageChangesets,
181 RocksdbIndices,
183}
184
185impl SnapshotComponentType {
186 pub const ALL: [Self; 8] = [
188 Self::State,
189 Self::Headers,
190 Self::Transactions,
191 Self::TransactionSenders,
192 Self::Receipts,
193 Self::AccountChangesets,
194 Self::StorageChangesets,
195 Self::RocksdbIndices,
196 ];
197
198 pub const fn key(&self) -> &'static str {
200 match self {
201 Self::State => "state",
202 Self::Headers => "headers",
203 Self::Transactions => "transactions",
204 Self::TransactionSenders => "transaction_senders",
205 Self::Receipts => "receipts",
206 Self::AccountChangesets => "account_changesets",
207 Self::StorageChangesets => "storage_changesets",
208 Self::RocksdbIndices => "rocksdb_indices",
209 }
210 }
211
212 pub const fn display_name(&self) -> &'static str {
214 match self {
215 Self::State => "State (mdbx)",
216 Self::Headers => "Headers",
217 Self::Transactions => "Transactions",
218 Self::TransactionSenders => "Transaction Senders",
219 Self::Receipts => "Receipts",
220 Self::AccountChangesets => "Account Changesets",
221 Self::StorageChangesets => "Storage Changesets",
222 Self::RocksdbIndices => "RocksDB Indices",
223 }
224 }
225
226 pub const fn is_required(&self) -> bool {
230 matches!(self, Self::State | Self::Headers)
231 }
232
233 pub const fn minimal_selection(&self) -> ComponentSelection {
244 match self {
245 Self::State | Self::Headers => ComponentSelection::All,
246 Self::Transactions | Self::AccountChangesets | Self::StorageChangesets => {
247 ComponentSelection::Distance(10_064)
248 }
249 Self::Receipts => ComponentSelection::Distance(64),
250 Self::TransactionSenders => ComponentSelection::None,
251 Self::RocksdbIndices => ComponentSelection::None,
252 }
253 }
254
255 pub const fn is_chunked(&self) -> bool {
257 !matches!(self, Self::State | Self::RocksdbIndices)
258 }
259}
260
261impl SnapshotManifest {
262 fn base_url_or_empty(&self) -> &str {
263 self.base_url.as_deref().unwrap_or("")
264 }
265
266 pub fn component(&self, ty: SnapshotComponentType) -> Option<&ComponentManifest> {
268 self.components.get(ty.key())
269 }
270
271 pub fn total_size(&self, types: &[SnapshotComponentType]) -> u64 {
273 types.iter().filter_map(|ty| self.component(*ty).map(|c| c.total_size())).sum()
274 }
275
276 pub fn archive_urls(&self, ty: SnapshotComponentType) -> Vec<String> {
278 let Some(component) = self.component(ty) else {
279 return vec![];
280 };
281
282 match component {
283 ComponentManifest::Single(single) => {
284 vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
285 }
286 ComponentManifest::Chunked(chunked) => {
287 let key = ty.key();
288 let num_chunks = chunked.num_chunks();
289 (0..num_chunks)
290 .map(|i| {
291 let start = i * chunked.blocks_per_file;
292 let end = (i + 1) * chunked.blocks_per_file - 1;
293 format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
294 })
295 .collect()
296 }
297 }
298 }
299
300 pub fn archive_urls_for_distance(
303 &self,
304 ty: SnapshotComponentType,
305 distance: Option<u64>,
306 ) -> Vec<String> {
307 let Some(component) = self.component(ty) else {
308 return vec![];
309 };
310
311 match component {
312 ComponentManifest::Single(single) => {
313 vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
314 }
315 ComponentManifest::Chunked(chunked) => {
316 let key = ty.key();
317 let num_chunks = chunked.num_chunks();
318
319 let start_chunk = match distance {
321 Some(dist) => {
322 let needed_blocks = dist.min(chunked.total_blocks);
324 let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
325 num_chunks.saturating_sub(needed_chunks)
326 }
327 None => 0, };
329
330 (start_chunk..num_chunks)
331 .map(|i| {
332 let start = i * chunked.blocks_per_file;
333 let end = (i + 1) * chunked.blocks_per_file - 1;
334 format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
335 })
336 .collect()
337 }
338 }
339 }
340
341 pub fn snapshot_archives_for_distance(
343 &self,
344 ty: SnapshotComponentType,
345 distance: Option<u64>,
346 ) -> Vec<SnapshotArchive> {
347 let Some(component) = self.component(ty) else {
348 return vec![];
349 };
350
351 match component {
352 ComponentManifest::Single(single) => {
353 vec![SnapshotArchive {
354 url: format!("{}/{}", self.base_url_or_empty(), single.file),
355 file_name: single.file.clone(),
356 size: single.size,
357 blake3: single.blake3.clone(),
358 output_files: single.output_files.clone(),
359 }]
360 }
361 ComponentManifest::Chunked(chunked) => {
362 let key = ty.key();
363 let num_chunks = chunked.num_chunks();
364
365 let start_chunk = match distance {
366 Some(dist) => {
367 let needed_blocks = dist.min(chunked.total_blocks);
368 let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
369 num_chunks.saturating_sub(needed_chunks)
370 }
371 None => 0,
372 };
373
374 (start_chunk..num_chunks)
375 .map(|i| {
376 let start = i * chunked.blocks_per_file;
377 let end = (i + 1) * chunked.blocks_per_file - 1;
378 let file_name = format!("{key}-{start}-{end}.tar.zst");
379 let size = chunked.chunk_sizes.get(i as usize).copied().unwrap_or_default();
380 let output_files =
381 chunked.chunk_output_files.get(i as usize).cloned().unwrap_or_default();
382
383 SnapshotArchive {
384 url: format!("{}/{}", self.base_url_or_empty(), file_name),
385 file_name,
386 size,
387 blake3: None,
388 output_files,
389 }
390 })
391 .collect()
392 }
393 }
394 }
395
396 pub fn size_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
401 let Some(component) = self.component(ty) else {
402 return 0;
403 };
404 match component {
405 ComponentManifest::Single(s) => s.size,
406 ComponentManifest::Chunked(chunked) => {
407 if chunked.chunk_sizes.is_empty() {
408 return 0;
409 }
410 let num_chunks = chunked.chunk_sizes.len() as u64;
411 let start_chunk = match distance {
412 Some(dist) => {
413 let needed = dist.min(chunked.total_blocks);
414 let needed_chunks = needed.div_ceil(chunked.blocks_per_file);
415 num_chunks.saturating_sub(needed_chunks)
416 }
417 None => 0,
418 };
419 chunked.chunk_sizes[start_chunk as usize..].iter().sum()
420 }
421 }
422 }
423
424 pub fn output_size_for_distance(
426 &self,
427 ty: SnapshotComponentType,
428 distance: Option<u64>,
429 ) -> u64 {
430 let Some(component) = self.component(ty) else {
431 return 0;
432 };
433
434 match component {
435 ComponentManifest::Single(single) => single.output_size(),
436 ComponentManifest::Chunked(chunked) => {
437 let num_chunks = chunked.num_chunks();
438 let start_chunk = match distance {
439 Some(dist) => {
440 let needed = dist.min(chunked.total_blocks);
441 let needed_chunks = needed.div_ceil(chunked.blocks_per_file);
442 num_chunks.saturating_sub(needed_chunks)
443 }
444 None => 0,
445 };
446
447 (start_chunk..num_chunks)
448 .map(|index| chunked.chunk_output_size(index as usize))
449 .sum()
450 }
451 }
452 }
453
454 pub fn chunks_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
456 let Some(ComponentManifest::Chunked(chunked)) = self.component(ty) else {
457 return if self.component(ty).is_some() { 1 } else { 0 };
458 };
459 match distance {
460 Some(dist) => {
461 let needed = dist.min(chunked.total_blocks);
462 needed.div_ceil(chunked.blocks_per_file)
463 }
464 None => chunked.num_chunks(),
465 }
466 }
467}
468
469impl ComponentManifest {
470 pub fn total_size(&self) -> u64 {
472 match self {
473 Self::Single(s) => s.size,
474 Self::Chunked(c) => c.chunk_sizes.iter().sum(),
475 }
476 }
477
478 pub fn total_output_size(&self) -> u64 {
480 match self {
481 Self::Single(single) => single.output_size(),
482 Self::Chunked(chunked) => chunked.total_output_size(),
483 }
484 }
485}
486
487impl ChunkedArchive {
488 pub fn num_chunks(&self) -> u64 {
490 self.total_blocks.div_ceil(self.blocks_per_file)
491 }
492
493 pub fn chunk_output_size(&self, index: usize) -> u64 {
495 self.chunk_decompressed_sizes.get(index).copied().unwrap_or_else(|| {
496 self.chunk_output_files
497 .get(index)
498 .map(|files| files.iter().map(|file| file.size).sum())
499 .unwrap_or(0)
500 })
501 }
502
503 pub fn total_output_size(&self) -> u64 {
505 if !self.chunk_decompressed_sizes.is_empty() {
506 self.chunk_decompressed_sizes.iter().sum()
507 } else {
508 self.chunk_output_files
509 .iter()
510 .map(|files| files.iter().map(|file| file.size).sum::<u64>())
511 .sum()
512 }
513 }
514}
515
516impl SingleArchive {
517 pub fn output_size(&self) -> u64 {
519 if self.decompressed_size != 0 {
520 self.decompressed_size
521 } else {
522 self.output_files.iter().map(|file| file.size).sum()
523 }
524 }
525}
526
527pub async fn fetch_manifest(manifest_url: &str) -> Result<SnapshotManifest> {
529 let client = Client::new();
530 let manifest: SnapshotManifest =
531 client.get(manifest_url).send().await?.error_for_status()?.json().await?;
532 Ok(manifest)
533}
534
535pub fn generate_manifest(
537 source_datadir: &Path,
538 output_dir: &Path,
539 base_url: Option<&str>,
540 block: u64,
541 chain_id: u64,
542 blocks_per_file: u64,
543) -> Result<SnapshotManifest> {
544 std::fs::create_dir_all(output_dir)?;
545
546 let mut components = BTreeMap::new();
547
548 for ty in &[
550 SnapshotComponentType::Headers,
551 SnapshotComponentType::Transactions,
552 SnapshotComponentType::TransactionSenders,
553 SnapshotComponentType::Receipts,
554 SnapshotComponentType::AccountChangesets,
555 SnapshotComponentType::StorageChangesets,
556 ] {
557 let key = ty.key();
558 let num_chunks = block.div_ceil(blocks_per_file);
559 let mut planned_chunks = Vec::with_capacity(num_chunks as usize);
560 let mut found_any = false;
561
562 for i in 0..num_chunks {
563 let start = i * blocks_per_file;
564 let end = (i + 1) * blocks_per_file - 1;
565 let source_files = source_files_for_chunk(source_datadir, *ty, start, end)?;
566
567 if source_files.is_empty() {
568 if found_any {
569 eyre::bail!("Missing source files for {} chunk {}-{}", key, start, end);
570 }
571 continue;
572 }
573
574 found_any = true;
575 planned_chunks.push(PlannedChunk {
576 chunk_idx: i,
577 archive_path: output_dir.join(chunk_filename(key, start, end)),
578 source_files,
579 });
580 }
581
582 if found_any {
583 let mut packaged_chunks = planned_chunks
584 .into_par_iter()
585 .map(|planned| -> Result<PackagedChunk> {
586 let output_files =
587 write_chunk_archive(&planned.archive_path, &planned.source_files)?;
588 let size = std::fs::metadata(&planned.archive_path)?.len();
589 Ok(PackagedChunk { chunk_idx: planned.chunk_idx, size, output_files })
590 })
591 .collect::<Vec<_>>()
592 .into_iter()
593 .collect::<Result<Vec<_>>>()?;
594
595 packaged_chunks.sort_unstable_by_key(|chunk| chunk.chunk_idx);
596 let chunk_sizes = packaged_chunks.iter().map(|chunk| chunk.size).collect::<Vec<_>>();
597 let chunk_output_files =
598 packaged_chunks.into_iter().map(|chunk| chunk.output_files).collect::<Vec<_>>();
599 let total_size: u64 = chunk_sizes.iter().sum();
600 info!(target: "reth::cli",
601 component = ty.display_name(),
602 chunks = chunk_sizes.len(),
603 total_blocks = block,
604 size = %super::DownloadProgress::format_size(total_size),
605 "Found chunked component"
606 );
607 components.insert(
608 key.to_string(),
609 ComponentManifest::Chunked(ChunkedArchive {
610 blocks_per_file,
611 total_blocks: block,
612 chunk_sizes,
613 chunk_decompressed_sizes: chunk_output_files
614 .iter()
615 .map(|files| files.iter().map(|file| file.size).sum())
616 .collect(),
617 chunk_output_files,
618 }),
619 );
620 }
621 }
622
623 let (state_size, state_output_files) = package_single_component(
624 output_dir,
625 "state.tar.zst",
626 &state_source_files(source_datadir)?,
627 )?;
628 components.insert(
629 SnapshotComponentType::State.key().to_string(),
630 ComponentManifest::Single(SingleArchive {
631 file: "state.tar.zst".to_string(),
632 size: state_size,
633 decompressed_size: state_output_files.iter().map(|file| file.size).sum(),
634 blake3: None,
635 output_files: state_output_files,
636 }),
637 );
638
639 let rocksdb_files = rocksdb_source_files(source_datadir)?;
640 if !rocksdb_files.is_empty() {
641 let (rocksdb_size, rocksdb_output_files) =
642 package_single_component(output_dir, "rocksdb_indices.tar.zst", &rocksdb_files)?;
643 components.insert(
644 SnapshotComponentType::RocksdbIndices.key().to_string(),
645 ComponentManifest::Single(SingleArchive {
646 file: "rocksdb_indices.tar.zst".to_string(),
647 size: rocksdb_size,
648 decompressed_size: rocksdb_output_files.iter().map(|file| file.size).sum(),
649 blake3: None,
650 output_files: rocksdb_output_files,
651 }),
652 );
653 }
654
655 let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?.as_secs();
656
657 Ok(SnapshotManifest {
658 block,
659 chain_id,
660 storage_version: 2,
661 timestamp,
662 base_url: base_url.map(str::to_owned),
663 reth_version: Some(reth_node_core::version::version_metadata().short_version.to_string()),
664 components,
665 })
666}
667
668pub fn chunk_filename(component_key: &str, start: u64, end: u64) -> String {
670 format!("{component_key}-{start}-{end}.tar.zst")
671}
672
673#[derive(Debug)]
674struct PlannedChunk {
675 chunk_idx: u64,
676 archive_path: PathBuf,
677 source_files: Vec<PathBuf>,
678}
679
680#[derive(Debug)]
681struct PackagedChunk {
682 chunk_idx: u64,
683 size: u64,
684 output_files: Vec<OutputFileChecksum>,
685}
686
687#[derive(Debug)]
688struct PlannedFile {
689 source_path: PathBuf,
690 relative_path: PathBuf,
691}
692
693fn source_files_for_chunk(
694 source_datadir: &Path,
695 component: SnapshotComponentType,
696 start: u64,
697 end: u64,
698) -> Result<Vec<PathBuf>> {
699 let Some(segment_name) = static_segment_name(component) else {
700 return Ok(Vec::new());
701 };
702
703 let static_files_dir = source_datadir.join("static_files");
704 let static_files_dir =
705 if static_files_dir.exists() { static_files_dir } else { source_datadir.to_path_buf() };
706 let prefix = format!("static_file_{segment_name}_{start}_{end}");
707
708 let mut files = Vec::new();
709 for entry in std::fs::read_dir(&static_files_dir)? {
710 let entry = entry?;
711 if !entry.file_type()?.is_file() {
712 continue;
713 }
714 if entry.file_name().to_string_lossy().starts_with(&prefix) {
715 files.push(entry.path());
716 }
717 }
718
719 files.sort_unstable();
720 Ok(files)
721}
722
723fn static_segment_name(component: SnapshotComponentType) -> Option<&'static str> {
724 match component {
725 SnapshotComponentType::Headers => Some("headers"),
726 SnapshotComponentType::Transactions => Some("transactions"),
727 SnapshotComponentType::TransactionSenders => Some("transaction-senders"),
728 SnapshotComponentType::Receipts => Some("receipts"),
729 SnapshotComponentType::AccountChangesets => Some("account-change-sets"),
730 SnapshotComponentType::StorageChangesets => Some("storage-change-sets"),
731 SnapshotComponentType::State | SnapshotComponentType::RocksdbIndices => None,
732 }
733}
734
735fn state_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
736 let db_dir = source_datadir.join("db");
737 if db_dir.exists() {
738 return collect_files_recursive(&db_dir, Path::new("db"));
739 }
740
741 if looks_like_db_dir(source_datadir)? {
742 return collect_files_recursive(source_datadir, Path::new("db"));
743 }
744
745 eyre::bail!("Could not find source state DB directory under {}", source_datadir.display())
746}
747
748fn rocksdb_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
749 let rocksdb_dir = source_datadir.join("rocksdb");
750 if !rocksdb_dir.exists() {
751 return Ok(Vec::new());
752 }
753
754 collect_files_recursive(&rocksdb_dir, Path::new("rocksdb"))
755}
756
757fn looks_like_db_dir(path: &Path) -> Result<bool> {
758 let entries = match std::fs::read_dir(path) {
759 Ok(entries) => entries,
760 Err(_) => return Ok(false),
761 };
762
763 for entry in entries {
764 let entry = entry?;
765 if !entry.file_type()?.is_file() {
766 continue;
767 }
768 let name = entry.file_name();
769 let name = name.to_string_lossy();
770 if name == "mdbx.dat" || name == "lock.mdb" || name == "data.mdb" {
771 return Ok(true);
772 }
773 }
774
775 Ok(false)
776}
777
778fn collect_files_recursive(root: &Path, output_prefix: &Path) -> Result<Vec<PlannedFile>> {
779 let mut files = Vec::new();
780 collect_files_recursive_inner(root, root, output_prefix, &mut files)?;
781 files.sort_unstable_by(|a, b| a.relative_path.cmp(&b.relative_path));
782 Ok(files)
783}
784
785fn collect_files_recursive_inner(
786 root: &Path,
787 dir: &Path,
788 output_prefix: &Path,
789 files: &mut Vec<PlannedFile>,
790) -> Result<()> {
791 for entry in std::fs::read_dir(dir)? {
792 let entry = entry?;
793 let path = entry.path();
794 let file_type = entry.file_type()?;
795 if file_type.is_dir() {
796 collect_files_recursive_inner(root, &path, output_prefix, files)?;
797 continue;
798 }
799 if !file_type.is_file() {
800 continue;
801 }
802
803 let relative = path.strip_prefix(root)?.to_path_buf();
804 files.push(PlannedFile { source_path: path, relative_path: output_prefix.join(relative) });
805 }
806
807 Ok(())
808}
809
810fn package_single_component(
811 output_dir: &Path,
812 archive_file_name: &str,
813 files: &[PlannedFile],
814) -> Result<(u64, Vec<OutputFileChecksum>)> {
815 if files.is_empty() {
816 eyre::bail!("Cannot package empty single archive: {}", archive_file_name);
817 }
818
819 let archive_path = output_dir.join(archive_file_name);
820 let output_files = write_archive_from_planned_files(&archive_path, files)?;
821 let size = std::fs::metadata(&archive_path)?.len();
822 Ok((size, output_files))
823}
824
825fn write_chunk_archive(path: &Path, source_files: &[PathBuf]) -> Result<Vec<OutputFileChecksum>> {
826 let planned_files = source_files
827 .iter()
828 .map(|source_path| {
829 let file_name = source_path.file_name().ok_or_else(|| {
830 eyre::eyre!("Invalid source file path: {}", source_path.display())
831 })?;
832 Ok::<_, eyre::Error>(PlannedFile {
833 source_path: source_path.clone(),
834 relative_path: PathBuf::from("static_files").join(file_name),
835 })
836 })
837 .collect::<Result<Vec<_>>>()?;
838
839 write_archive_from_planned_files(path, &planned_files)
840}
841
842fn write_archive_from_planned_files(
843 path: &Path,
844 files: &[PlannedFile],
845) -> Result<Vec<OutputFileChecksum>> {
846 let file = std::fs::File::create(path)?;
847 let mut encoder = zstd::Encoder::new(file, 0)?;
848 encoder.include_checksum(true)?;
851 let mut builder = tar::Builder::new(encoder);
852
853 let mut output_files = Vec::with_capacity(files.len());
854 for planned in files {
855 let mut header = tar::Header::new_gnu();
856 header.set_size(std::fs::metadata(&planned.source_path)?.len());
857 header.set_mode(0o644);
858 header.set_cksum();
859
860 let source_file = std::fs::File::open(&planned.source_path)?;
861 let mut reader = HashingReader::new(source_file);
862 builder.append_data(&mut header, &planned.relative_path, &mut reader)?;
863
864 output_files.push(OutputFileChecksum {
865 path: planned.relative_path.to_string_lossy().to_string(),
866 size: reader.bytes_read,
867 blake3: reader.finalize(),
868 });
869 }
870
871 builder.finish()?;
872 let encoder = builder.into_inner()?;
873 encoder.finish()?;
874
875 Ok(output_files)
876}
877
878struct HashingReader<R> {
879 inner: R,
880 hasher: Hasher,
881 bytes_read: u64,
882}
883
884impl<R: Read> HashingReader<R> {
885 fn new(inner: R) -> Self {
886 Self { inner, hasher: Hasher::new(), bytes_read: 0 }
887 }
888
889 fn finalize(self) -> String {
890 self.hasher.finalize().to_hex().to_string()
891 }
892}
893
894impl<R: Read> Read for HashingReader<R> {
895 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
896 let n = self.inner.read(buf)?;
897 if n > 0 {
898 self.bytes_read += n as u64;
899 self.hasher.update(&buf[..n]);
900 }
901 Ok(n)
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908 use tempfile::tempdir;
909
910 fn test_manifest() -> SnapshotManifest {
911 let mut components = BTreeMap::new();
912 components.insert(
913 "state".to_string(),
914 ComponentManifest::Single(SingleArchive {
915 file: "state.tar.zst".to_string(),
916 size: 100,
917 decompressed_size: 0,
918 blake3: None,
919 output_files: vec![],
920 }),
921 );
922 components.insert(
923 "transactions".to_string(),
924 ComponentManifest::Chunked(ChunkedArchive {
925 blocks_per_file: 500_000,
926 total_blocks: 1_500_000,
927 chunk_sizes: vec![80_000, 100_000, 120_000],
928 chunk_decompressed_sizes: vec![],
929 chunk_output_files: vec![vec![], vec![], vec![]],
930 }),
931 );
932 components.insert(
933 "headers".to_string(),
934 ComponentManifest::Chunked(ChunkedArchive {
935 blocks_per_file: 500_000,
936 total_blocks: 1_500_000,
937 chunk_sizes: vec![40_000, 50_000, 60_000],
938 chunk_decompressed_sizes: vec![],
939 chunk_output_files: vec![vec![], vec![], vec![]],
940 }),
941 );
942 SnapshotManifest {
943 block: 1_500_000,
944 chain_id: 1,
945 storage_version: 2,
946 timestamp: 0,
947 base_url: Some("https://example.com".to_string()),
948 reth_version: None,
949 components,
950 }
951 }
952
953 #[test]
954 fn archive_urls_for_distance_all() {
955 let m = test_manifest();
956 let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, None);
957 assert_eq!(urls.len(), 3);
958 assert_eq!(urls[0], "https://example.com/transactions-0-499999.tar.zst");
959 assert_eq!(urls[2], "https://example.com/transactions-1000000-1499999.tar.zst");
960 }
961
962 #[test]
963 fn archive_urls_for_distance_partial() {
964 let m = test_manifest();
965 let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, Some(600_000));
967 assert_eq!(urls.len(), 2);
968 assert_eq!(urls[0], "https://example.com/transactions-500000-999999.tar.zst");
969 assert_eq!(urls[1], "https://example.com/transactions-1000000-1499999.tar.zst");
970 }
971
972 #[test]
973 fn archive_urls_for_distance_single_component() {
974 let m = test_manifest();
975 let urls = m.archive_urls_for_distance(SnapshotComponentType::State, Some(100));
977 assert_eq!(urls.len(), 1);
978 assert_eq!(urls[0], "https://example.com/state.tar.zst");
979 }
980
981 #[test]
982 fn archive_urls_for_distance_rocksdb_indices_single_component() {
983 let mut components = BTreeMap::new();
984 components.insert(
985 "rocksdb_indices".to_string(),
986 ComponentManifest::Single(SingleArchive {
987 file: "rocksdb_indices.tar.zst".to_string(),
988 size: 777,
989 decompressed_size: 0,
990 blake3: None,
991 output_files: vec![],
992 }),
993 );
994 let m = SnapshotManifest {
995 block: 1,
996 chain_id: 1,
997 storage_version: 2,
998 timestamp: 0,
999 base_url: Some("https://example.com".to_string()),
1000 reth_version: None,
1001 components,
1002 };
1003
1004 let urls = m.archive_urls_for_distance(SnapshotComponentType::RocksdbIndices, Some(10));
1005 assert_eq!(urls.len(), 1);
1006 assert_eq!(urls[0], "https://example.com/rocksdb_indices.tar.zst");
1007 assert_eq!(m.size_for_distance(SnapshotComponentType::RocksdbIndices, Some(10)), 777);
1008 }
1009
1010 #[test]
1011 fn archive_urls_for_distance_missing_component() {
1012 let m = test_manifest();
1013 let urls = m.archive_urls_for_distance(SnapshotComponentType::Receipts, None);
1014 assert!(urls.is_empty());
1015 }
1016
1017 #[test]
1018 fn chunks_for_distance_all() {
1019 let m = test_manifest();
1020 assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, None), 3);
1021 }
1022
1023 #[test]
1024 fn chunks_for_distance_partial() {
1025 let m = test_manifest();
1026 assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(600_000)), 2);
1027 assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(100_000)), 1);
1028 }
1029
1030 #[test]
1031 fn chunks_for_distance_single() {
1032 let m = test_manifest();
1033 assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, None), 1);
1034 assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, Some(100)), 1);
1035 }
1036
1037 #[test]
1038 fn chunks_for_distance_missing() {
1039 let m = test_manifest();
1040 assert_eq!(m.chunks_for_distance(SnapshotComponentType::Receipts, None), 0);
1041 }
1042
1043 #[test]
1044 fn component_selection_display() {
1045 assert_eq!(ComponentSelection::All.to_string(), "All");
1046 assert_eq!(ComponentSelection::Distance(10_064).to_string(), "Last 10064 blocks");
1047 assert_eq!(ComponentSelection::Since(15_537_394).to_string(), "Since block 15537394");
1048 assert_eq!(ComponentSelection::None.to_string(), "None");
1049 }
1050
1051 #[test]
1052 fn archive_urls_aligned_to_blocks_per_file() {
1053 let mut components = BTreeMap::new();
1056 components.insert(
1057 "storage_changesets".to_string(),
1058 ComponentManifest::Chunked(ChunkedArchive {
1059 blocks_per_file: 500_000,
1060 total_blocks: 24_396_822,
1061 chunk_sizes: vec![100; 49], chunk_decompressed_sizes: vec![],
1063 chunk_output_files: vec![vec![]; 49],
1064 }),
1065 );
1066 let m = SnapshotManifest {
1067 block: 24_396_822,
1068 chain_id: 1,
1069 storage_version: 2,
1070 timestamp: 0,
1071 base_url: Some("https://example.com".to_string()),
1072 reth_version: None,
1073 components,
1074 };
1075 let urls = m.archive_urls(SnapshotComponentType::StorageChangesets);
1076 assert_eq!(urls.len(), 49);
1077 assert_eq!(urls[0], "https://example.com/storage_changesets-0-499999.tar.zst");
1079 assert_eq!(urls[48], "https://example.com/storage_changesets-24000000-24499999.tar.zst");
1081 }
1082
1083 #[test]
1084 fn size_for_distance_sums_tail_chunks() {
1085 let m = test_manifest();
1086 assert_eq!(m.size_for_distance(SnapshotComponentType::Transactions, None), 300_000);
1089 assert_eq!(
1091 m.size_for_distance(SnapshotComponentType::Transactions, Some(500_000)),
1092 120_000
1093 );
1094 assert_eq!(
1096 m.size_for_distance(SnapshotComponentType::Transactions, Some(600_000)),
1097 220_000
1098 );
1099 assert_eq!(m.size_for_distance(SnapshotComponentType::State, Some(100)), 100);
1101 assert_eq!(m.size_for_distance(SnapshotComponentType::Receipts, None), 0);
1103 }
1104
1105 #[test]
1106 fn output_size_for_distance_uses_manifest_or_output_files() {
1107 let m = test_manifest();
1108 assert_eq!(m.output_size_for_distance(SnapshotComponentType::Transactions, None), 0);
1109
1110 let mut components = BTreeMap::new();
1111 components.insert(
1112 "state".to_string(),
1113 ComponentManifest::Single(SingleArchive {
1114 file: "state.tar.zst".to_string(),
1115 size: 100,
1116 decompressed_size: 1_000,
1117 blake3: None,
1118 output_files: vec![OutputFileChecksum {
1119 path: "db/mdbx.dat".to_string(),
1120 size: 1_000,
1121 blake3: "h0".to_string(),
1122 }],
1123 }),
1124 );
1125 components.insert(
1126 "transactions".to_string(),
1127 ComponentManifest::Chunked(ChunkedArchive {
1128 blocks_per_file: 500_000,
1129 total_blocks: 1_000_000,
1130 chunk_sizes: vec![80_000, 120_000],
1131 chunk_decompressed_sizes: vec![111, 222],
1132 chunk_output_files: vec![
1133 vec![OutputFileChecksum {
1134 path: "static_files/static_file_transactions_0_499999.bin".to_string(),
1135 size: 111,
1136 blake3: "h0".to_string(),
1137 }],
1138 vec![OutputFileChecksum {
1139 path: "static_files/static_file_transactions_500000_999999.bin".to_string(),
1140 size: 222,
1141 blake3: "h1".to_string(),
1142 }],
1143 ],
1144 }),
1145 );
1146 let manifest = SnapshotManifest {
1147 block: 1_000_000,
1148 chain_id: 1,
1149 storage_version: 2,
1150 timestamp: 0,
1151 base_url: Some("https://example.com".to_string()),
1152 reth_version: None,
1153 components,
1154 };
1155
1156 assert_eq!(manifest.output_size_for_distance(SnapshotComponentType::State, None), 1_000);
1157 assert_eq!(
1158 manifest.output_size_for_distance(SnapshotComponentType::Transactions, None),
1159 333
1160 );
1161 assert_eq!(
1162 manifest.output_size_for_distance(SnapshotComponentType::Transactions, Some(500_000)),
1163 222
1164 );
1165 }
1166
1167 #[test]
1168 fn archive_descriptors_include_checksum_metadata() {
1169 let mut components = BTreeMap::new();
1170 components.insert(
1171 "state".to_string(),
1172 ComponentManifest::Single(SingleArchive {
1173 file: "state.tar.zst".to_string(),
1174 size: 100,
1175 decompressed_size: 1_000,
1176 blake3: Some("abc123".to_string()),
1177 output_files: vec![OutputFileChecksum {
1178 path: "db/mdbx.dat".to_string(),
1179 size: 1000,
1180 blake3: "s0".to_string(),
1181 }],
1182 }),
1183 );
1184 components.insert(
1185 "transactions".to_string(),
1186 ComponentManifest::Chunked(ChunkedArchive {
1187 blocks_per_file: 500_000,
1188 total_blocks: 1_000_000,
1189 chunk_sizes: vec![80_000, 120_000],
1190 chunk_decompressed_sizes: vec![111, 222],
1191 chunk_output_files: vec![
1192 vec![OutputFileChecksum {
1193 path: "static_files/static_file_transactions_0_499999.bin".to_string(),
1194 size: 111,
1195 blake3: "h0".to_string(),
1196 }],
1197 vec![OutputFileChecksum {
1198 path: "static_files/static_file_transactions_500000_999999.bin".to_string(),
1199 size: 222,
1200 blake3: "h1".to_string(),
1201 }],
1202 ],
1203 }),
1204 );
1205
1206 let m = SnapshotManifest {
1207 block: 1_000_000,
1208 chain_id: 1,
1209 storage_version: 2,
1210 timestamp: 0,
1211 base_url: Some("https://example.com".to_string()),
1212 reth_version: None,
1213 components,
1214 };
1215
1216 let state = m.snapshot_archives_for_distance(SnapshotComponentType::State, None);
1217 assert_eq!(state.len(), 1);
1218 assert_eq!(state[0].file_name, "state.tar.zst");
1219 assert_eq!(state[0].blake3.as_deref(), Some("abc123"));
1220 assert_eq!(state[0].output_files.len(), 1);
1221
1222 let tx = m.snapshot_archives_for_distance(SnapshotComponentType::Transactions, None);
1223 assert_eq!(tx.len(), 2);
1224 assert_eq!(tx[0].blake3, None);
1225 assert_eq!(tx[1].blake3, None);
1226 assert_eq!(tx[0].output_files[0].size, 111);
1227 }
1228
1229 #[test]
1230 fn generate_manifest_includes_state_single_archive() {
1231 let source = tempdir().unwrap();
1232 let output = tempdir().unwrap();
1233 let db_dir = source.path().join("db");
1234 std::fs::create_dir_all(&db_dir).unwrap();
1235 std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1236
1237 let manifest =
1238 generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1239
1240 let state = manifest.component(SnapshotComponentType::State).unwrap();
1241 let ComponentManifest::Single(state) = state else {
1242 panic!("state should be a single archive")
1243 };
1244 assert_eq!(state.file, "state.tar.zst");
1245 assert!(state.decompressed_size > 0);
1246 assert!(!state.output_files.is_empty());
1247 assert_eq!(state.output_files[0].path, "db/mdbx.dat");
1248 assert!(output.path().join("state.tar.zst").exists());
1249 }
1250
1251 #[test]
1252 fn generate_manifest_includes_rocksdb_single_archive_when_present() {
1253 let source = tempdir().unwrap();
1254 let output = tempdir().unwrap();
1255 let db_dir = source.path().join("db");
1256 std::fs::create_dir_all(&db_dir).unwrap();
1257 std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1258 let rocksdb_dir = source.path().join("rocksdb");
1259 std::fs::create_dir_all(&rocksdb_dir).unwrap();
1260 std::fs::write(rocksdb_dir.join("CURRENT"), b"MANIFEST-000001").unwrap();
1261
1262 let manifest =
1263 generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1264
1265 let rocksdb = manifest.component(SnapshotComponentType::RocksdbIndices).unwrap();
1266 let ComponentManifest::Single(rocksdb) = rocksdb else {
1267 panic!("rocksdb indices should be a single archive")
1268 };
1269 assert_eq!(rocksdb.file, "rocksdb_indices.tar.zst");
1270 assert!(rocksdb.decompressed_size > 0);
1271 assert!(!rocksdb.output_files.is_empty());
1272 assert_eq!(rocksdb.output_files[0].path, "rocksdb/CURRENT");
1273 assert!(output.path().join("rocksdb_indices.tar.zst").exists());
1274 }
1275}