reth_stages/stages/s3/downloader/
fetch.rs
1use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange};
2
3use super::{
4 error::DownloaderError,
5 meta::Metadata,
6 worker::{WorkerRequest, WorkerResponse},
7};
8use alloy_primitives::B256;
9use reqwest::{header::CONTENT_LENGTH, Client};
10use std::{
11 collections::HashMap,
12 fs::{File, OpenOptions},
13 io::BufReader,
14 path::Path,
15};
16use tracing::{debug, error, info};
17
18pub async fn fetch(
37 filename: &str,
38 target_dir: &Path,
39 url: &str,
40 mut concurrent: u64,
41 file_hash: Option<B256>,
42) -> Result<(), DownloaderError> {
43 let download_dir = target_dir.join("download");
45 reth_fs_util::create_dir_all(&download_dir)?;
46
47 let data_file = download_dir.join(filename);
48 let mut metadata = metadata(&data_file, url).await?;
49 if metadata.is_done() {
50 return Ok(())
51 }
52
53 {
55 let file = OpenOptions::new()
56 .create(true)
57 .truncate(true)
58 .read(true)
59 .write(true)
60 .open(&data_file)?;
61
62 if file.metadata()?.len() as usize != metadata.total_size {
63 info!(target: "sync::stages::s3::downloader", ?filename, length = metadata.total_size, "Preallocating space.");
64 file.set_len(metadata.total_size as u64)?;
65 }
66 }
67
68 while !metadata.is_done() {
69 info!(target: "sync::stages::s3::downloader", ?filename, "Downloading.");
70
71 let missing_chunks = metadata.needed_ranges();
73 concurrent = concurrent
74 .min(std::thread::available_parallelism()?.get() as u64)
75 .min(missing_chunks.len() as u64);
76
77 let mut orchestrator_rx = spawn_workers(url, concurrent, &data_file);
78
79 let mut workers = HashMap::new();
80 let mut missing_chunks = missing_chunks.into_iter();
81
82 while let Some(worker_msg) = orchestrator_rx.recv().await {
84 debug!(target: "sync::stages::s3::downloader", ?worker_msg, "received message from worker");
85
86 let available_worker = match worker_msg {
87 WorkerResponse::Ready { worker_id, tx } => {
88 debug!(target: "sync::stages::s3::downloader", ?worker_id, "Worker ready.");
89 workers.insert(worker_id, tx);
90 worker_id
91 }
92 WorkerResponse::DownloadedChunk { worker_id, chunk_index, written_bytes } => {
93 metadata.update_chunk(chunk_index, written_bytes)?;
94 worker_id
95 }
96 WorkerResponse::Err { worker_id, error } => {
97 error!(target: "sync::stages::s3::downloader", ?worker_id, "Worker found an error: {:?}", error);
98 return Err(error)
99 }
100 };
101
102 let msg = if let Some(RemainingChunkRange { index, start, end }) = missing_chunks.next()
103 {
104 debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request.");
105 WorkerRequest::Download { chunk_index: index, start, end }
106 } else {
107 debug!(target: "sync::stages::s3::downloader", ?available_worker, "Sent Finish command to worker.");
108 WorkerRequest::Finish
109 };
110
111 let _ = workers.get(&available_worker).expect("should exist").send(msg);
112 }
113 }
114
115 if let Some(file_hash) = file_hash {
116 info!(target: "sync::stages::s3::downloader", ?filename, "Checking file integrity.");
117 check_file_hash(&data_file, &file_hash)?;
118 }
119
120 metadata.delete()?;
122
123 let file_directory = target_dir.join(filename);
125 reth_fs_util::rename(data_file, &file_directory)?;
126 info!(target: "sync::stages::s3::downloader", ?file_directory, "Moved file from temporary to target directory.");
127
128 Ok(())
129}
130
131async fn metadata(data_file: &Path, url: &str) -> Result<Metadata, DownloaderError> {
134 if Metadata::file_path(data_file).exists() {
135 debug!(target: "sync::stages::s3::downloader", ?data_file, "Loading metadata ");
136 return Metadata::load(data_file)
137 }
138
139 let client = Client::new();
140 let resp = client.head(url).send().await?;
141 let total_length: usize = resp
142 .headers()
143 .get(CONTENT_LENGTH)
144 .and_then(|v| v.to_str().ok())
145 .and_then(|s| s.parse().ok())
146 .ok_or(DownloaderError::EmptyContentLength)?;
147
148 debug!(target: "sync::stages::s3::downloader", ?data_file, "Creating metadata ");
149
150 Metadata::builder(data_file).with_total_size(total_length).build()
151}
152
153fn check_file_hash(path: &Path, expected: &B256) -> Result<(), DownloaderError> {
155 let mut reader = BufReader::new(File::open(path)?);
156 let mut hasher = blake3::Hasher::new();
157 std::io::copy(&mut reader, &mut hasher)?;
158
159 let file_hash = hasher.finalize();
160 if file_hash.as_bytes() != expected {
161 return Err(DownloaderError::InvalidFileHash(file_hash.as_bytes().into(), *expected))
162 }
163
164 Ok(())
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use alloy_primitives::b256;
171
172 #[tokio::test]
173 async fn test_download() {
174 reth_tracing::init_test_tracing();
175
176 let b3sum = b256!("0xe9908f4992ae39c4d1fe9984dd743ae3f8e9a84a4a5af768128833605ff72723");
177 let url = "https://link.testfile.org/15MB";
178
179 let file = tempfile::NamedTempFile::new().unwrap();
180 let filename = file.path().file_name().unwrap().to_str().unwrap();
181 let target_dir = file.path().parent().unwrap();
182 fetch(filename, target_dir, url, 4, Some(b3sum)).await.unwrap();
183 }
184}