reth_stages/stages/s3/downloader/
fetch.rs

1use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange};
2
3use super::{
4    error::DownloaderError,
5    meta::Metadata,
6    worker::{WorkerRequest, WorkerResponse},
7};
8use alloy_primitives::B256;
9use reqwest::{header::CONTENT_LENGTH, Client};
10use std::{
11    collections::HashMap,
12    fs::{File, OpenOptions},
13    io::BufReader,
14    path::Path,
15};
16use tracing::{debug, error, info};
17
18/// Downloads file from url to data file path.
19///
20/// If a `file_hash` is passed, it will verify it at the end.
21///
22/// ## Details
23///
24/// 1) A [`Metadata`] file is created or opened in `{target_dir}/download/{filename}.metadata`. It
25///    tracks the download progress including total file size, downloaded bytes, chunk sizes, and
26///    ranges that still need downloading. Allows for resumability.
27/// 2) The target file is preallocated with the total size of the file in
28///    `{target_dir}/download/{filename}`.
29/// 3) Multiple `workers` are spawned for downloading of specific chunks of the file.
30/// 4) `Orchestrator` manages workers, distributes chunk ranges, and ensures the download progresses
31///    efficiently by dynamically assigning tasks to workers as they become available.
32/// 5) Once the file is downloaded:
33///     * If `file_hash` is `Some`, verifies its blake3 hash.
34///     * Deletes the metadata file
35///     * Moves downloaded file to target directory.
36pub async fn fetch(
37    filename: &str,
38    target_dir: &Path,
39    url: &str,
40    mut concurrent: u64,
41    file_hash: Option<B256>,
42) -> Result<(), DownloaderError> {
43    // Create a temporary directory to download files to, before moving them to target_dir.
44    let download_dir = target_dir.join("download");
45    reth_fs_util::create_dir_all(&download_dir)?;
46
47    let data_file = download_dir.join(filename);
48    let mut metadata = metadata(&data_file, url).await?;
49    if metadata.is_done() {
50        return Ok(())
51    }
52
53    // Ensure the file is preallocated so we can download it concurrently
54    {
55        let file = OpenOptions::new()
56            .create(true)
57            .truncate(true)
58            .read(true)
59            .write(true)
60            .open(&data_file)?;
61
62        if file.metadata()?.len() as usize != metadata.total_size {
63            info!(target: "sync::stages::s3::downloader", ?filename, length = metadata.total_size, "Preallocating space.");
64            file.set_len(metadata.total_size as u64)?;
65        }
66    }
67
68    while !metadata.is_done() {
69        info!(target: "sync::stages::s3::downloader", ?filename, "Downloading.");
70
71        // Find the missing file chunks and the minimum number of workers required
72        let missing_chunks = metadata.needed_ranges();
73        concurrent = concurrent
74            .min(std::thread::available_parallelism()?.get() as u64)
75            .min(missing_chunks.len() as u64);
76
77        let mut orchestrator_rx = spawn_workers(url, concurrent, &data_file);
78
79        let mut workers = HashMap::new();
80        let mut missing_chunks = missing_chunks.into_iter();
81
82        // Distribute chunk ranges to workers when they free up
83        while let Some(worker_msg) = orchestrator_rx.recv().await {
84            debug!(target: "sync::stages::s3::downloader", ?worker_msg, "received message from worker");
85
86            let available_worker = match worker_msg {
87                WorkerResponse::Ready { worker_id, tx } => {
88                    debug!(target: "sync::stages::s3::downloader", ?worker_id, "Worker ready.");
89                    workers.insert(worker_id, tx);
90                    worker_id
91                }
92                WorkerResponse::DownloadedChunk { worker_id, chunk_index, written_bytes } => {
93                    metadata.update_chunk(chunk_index, written_bytes)?;
94                    worker_id
95                }
96                WorkerResponse::Err { worker_id, error } => {
97                    error!(target: "sync::stages::s3::downloader", ?worker_id, "Worker found an error: {:?}", error);
98                    return Err(error)
99                }
100            };
101
102            let msg = if let Some(RemainingChunkRange { index, start, end }) = missing_chunks.next()
103            {
104                debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request.");
105                WorkerRequest::Download { chunk_index: index, start, end }
106            } else {
107                debug!(target: "sync::stages::s3::downloader", ?available_worker, "Sent Finish command to worker.");
108                WorkerRequest::Finish
109            };
110
111            let _ = workers.get(&available_worker).expect("should exist").send(msg);
112        }
113    }
114
115    if let Some(file_hash) = file_hash {
116        info!(target: "sync::stages::s3::downloader", ?filename, "Checking file integrity.");
117        check_file_hash(&data_file, &file_hash)?;
118    }
119
120    // No longer need the metadata file.
121    metadata.delete()?;
122
123    // Move downloaded file to desired directory.
124    let file_directory = target_dir.join(filename);
125    reth_fs_util::rename(data_file, &file_directory)?;
126    info!(target: "sync::stages::s3::downloader", ?file_directory, "Moved file from temporary to target directory.");
127
128    Ok(())
129}
130
131/// Creates a metadata file used to keep track of the downloaded chunks. Useful on resuming after a
132/// shutdown.
133async fn metadata(data_file: &Path, url: &str) -> Result<Metadata, DownloaderError> {
134    if Metadata::file_path(data_file).exists() {
135        debug!(target: "sync::stages::s3::downloader", ?data_file, "Loading metadata ");
136        return Metadata::load(data_file)
137    }
138
139    let client = Client::new();
140    let resp = client.head(url).send().await?;
141    let total_length: usize = resp
142        .headers()
143        .get(CONTENT_LENGTH)
144        .and_then(|v| v.to_str().ok())
145        .and_then(|s| s.parse().ok())
146        .ok_or(DownloaderError::EmptyContentLength)?;
147
148    debug!(target: "sync::stages::s3::downloader", ?data_file, "Creating metadata ");
149
150    Metadata::builder(data_file).with_total_size(total_length).build()
151}
152
153/// Ensures the file on path has the expected blake3 hash.
154fn check_file_hash(path: &Path, expected: &B256) -> Result<(), DownloaderError> {
155    let mut reader = BufReader::new(File::open(path)?);
156    let mut hasher = blake3::Hasher::new();
157    std::io::copy(&mut reader, &mut hasher)?;
158
159    let file_hash = hasher.finalize();
160    if file_hash.as_bytes() != expected {
161        return Err(DownloaderError::InvalidFileHash(file_hash.as_bytes().into(), *expected))
162    }
163
164    Ok(())
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use alloy_primitives::b256;
171
172    #[tokio::test]
173    async fn test_download() {
174        reth_tracing::init_test_tracing();
175
176        let b3sum = b256!("0xe9908f4992ae39c4d1fe9984dd743ae3f8e9a84a4a5af768128833605ff72723");
177        let url = "https://link.testfile.org/15MB";
178
179        let file = tempfile::NamedTempFile::new().unwrap();
180        let filename = file.path().file_name().unwrap().to_str().unwrap();
181        let target_dir = file.path().parent().unwrap();
182        fetch(filename, target_dir, url, 4, Some(b3sum)).await.unwrap();
183    }
184}