Skip to main content

reth_downloaders/
file_client.rs

1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::Either;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::RequestError,
12    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13    priority::Priority,
14    BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21    fs::File,
22    io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31/// Default byte length of chunk to read from chain file.
32///
33/// Default is 1 GB.
34pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36/// Front-end API for fetching chain data from a file.
37///
38/// Blocks are assumed to be written one after another in a file, as rlp bytes.
39///
40/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
41/// rlp(block1) || rlp(block2) || rlp(block3)
42///
43/// Blocks are assumed to have populated transactions, so reading headers will also buffer
44/// transactions in memory for use in the bodies stage.
45///
46/// This reads the entire file into memory, so it is not suitable for large files.
47#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49    /// The buffered headers retrieved when fetching new bodies.
50    headers: HashMap<BlockNumber, B::Header>,
51
52    /// A mapping between block hash and number.
53    hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55    /// The buffered bodies retrieved when fetching new headers.
56    bodies: HashMap<BlockHash, B::Body>,
57}
58
59/// An error that can occur when constructing and using a [`FileClient`].
60#[derive(Debug, Error)]
61pub enum FileClientError {
62    /// An error occurred when validating a header from file.
63    #[error(transparent)]
64    Consensus(#[from] ConsensusError),
65
66    /// An error occurred when opening or reading the file.
67    #[error(transparent)]
68    Io(#[from] std::io::Error),
69
70    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
71    #[error("{0}")]
72    Rlp(alloy_rlp::Error, Vec<u8>),
73
74    /// Custom error message.
75    #[error("{0}")]
76    Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80    fn from(value: &'static str) -> Self {
81        Self::Custom(value)
82    }
83}
84
85impl<B: FullBlock> FileClient<B> {
86    /// Create a new file client from a slice of sealed blocks.
87    pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
88        let blocks: Vec<_> = blocks.into_iter().collect();
89        let capacity = blocks.len();
90
91        let mut headers = HashMap::with_capacity(capacity);
92        let mut hash_to_number = HashMap::with_capacity(capacity);
93        let mut bodies = HashMap::with_capacity(capacity);
94
95        for block in blocks {
96            let number = block.number();
97            let hash = block.hash();
98            let (header, body) = block.split_sealed_header_body();
99
100            headers.insert(number, header.into_header());
101            hash_to_number.insert(hash, number);
102            bodies.insert(hash, body);
103        }
104
105        Self { headers, hash_to_number, bodies }
106    }
107
108    /// Create a new file client from a file path.
109    pub async fn new<P: AsRef<Path>>(
110        path: P,
111        consensus: Arc<dyn Consensus<B>>,
112    ) -> Result<Self, FileClientError> {
113        let file = File::open(path).await?;
114        Self::from_file(file, consensus).await
115    }
116
117    /// Initialize the [`FileClient`] with a file directly.
118    pub(crate) async fn from_file(
119        mut file: File,
120        consensus: Arc<dyn Consensus<B>>,
121    ) -> Result<Self, FileClientError> {
122        // get file len from metadata before reading
123        let metadata = file.metadata().await?;
124        let file_len = metadata.len();
125
126        let mut reader = vec![];
127        file.read_to_end(&mut reader).await?;
128
129        Ok(FileClientBuilder { consensus, parent_header: None }
130            .build(&reader[..], file_len)
131            .await?
132            .file_client)
133    }
134
135    /// Get the tip hash of the chain.
136    pub fn tip(&self) -> Option<B256> {
137        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
138    }
139
140    /// Get the start hash of the chain.
141    pub fn start(&self) -> Option<B256> {
142        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
143    }
144
145    /// Returns the highest block number of this client has or `None` if empty
146    pub fn max_block(&self) -> Option<u64> {
147        self.headers.keys().max().copied()
148    }
149
150    /// Returns the lowest block number of this client has or `None` if empty
151    pub fn min_block(&self) -> Option<u64> {
152        self.headers.keys().min().copied()
153    }
154
155    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
156    /// before returning.
157    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
158        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
159    }
160
161    /// Returns true if all blocks are canonical (no gaps)
162    pub fn has_canonical_blocks(&self) -> bool {
163        if self.headers.is_empty() {
164            return true
165        }
166        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
167        nums.sort_unstable();
168        let mut iter = nums.into_iter();
169        let mut lowest = iter.next().expect("not empty");
170        for next in iter {
171            if next != lowest + 1 {
172                return false
173            }
174            lowest = next;
175        }
176        true
177    }
178
179    /// Use the provided bodies as the file client's block body buffer.
180    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
181        self.bodies = bodies;
182        self
183    }
184
185    /// Use the provided headers as the file client's block body buffer.
186    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
187        self.headers = headers;
188        for (number, header) in &self.headers {
189            self.hash_to_number.insert(header.hash_slow(), *number);
190        }
191        self
192    }
193
194    /// Returns the current number of headers in the client.
195    pub fn headers_len(&self) -> usize {
196        self.headers.len()
197    }
198
199    /// Returns the current number of bodies in the client.
200    pub fn bodies_len(&self) -> usize {
201        self.bodies.len()
202    }
203
204    /// Returns an iterator over headers in the client.
205    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
206        self.headers.values()
207    }
208
209    /// Returns a mutable iterator over bodies in the client.
210    ///
211    /// Panics, if file client headers and bodies are not mapping 1-1.
212    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
213        let bodies = &mut self.bodies;
214        let numbers = &self.hash_to_number;
215        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
216    }
217
218    /// Returns the current number of transactions in the client.
219    pub fn total_transactions(&self) -> usize {
220        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
221    }
222}
223
224struct FileClientBuilder<B: Block> {
225    pub consensus: Arc<dyn Consensus<B>>,
226    pub parent_header: Option<SealedHeader<B::Header>>,
227}
228
229impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
230    for FileClientBuilder<B>
231{
232    type Error = FileClientError;
233    type Output = FileClient<B>;
234
235    /// Initialize the [`FileClient`] from bytes that have been read from file.
236    fn build<R>(
237        &self,
238        reader: R,
239        num_bytes: u64,
240    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
241    where
242        R: AsyncReadExt + Unpin,
243    {
244        let mut headers = HashMap::default();
245        let mut hash_to_number = HashMap::default();
246        let mut bodies = HashMap::default();
247
248        // use with_capacity to make sure the internal buffer contains the entire chunk
249        let mut stream =
250            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
251
252        trace!(target: "downloaders::file",
253            target_num_bytes=num_bytes,
254            capacity=stream.read_buffer().capacity(),
255            "init decode stream"
256        );
257
258        let mut remaining_bytes = vec![];
259
260        let mut log_interval = 0;
261        let mut log_interval_start_block = 0;
262
263        let mut parent_header = self.parent_header.clone();
264
265        async move {
266            while let Some(block_res) = stream.next().await {
267                let block = match block_res {
268                    Ok(block) => block,
269                    Err(FileClientError::Rlp(err, bytes)) => {
270                        trace!(target: "downloaders::file",
271                            %err,
272                            bytes_len=bytes.len(),
273                            "partial block returned from decoding chunk"
274                        );
275                        remaining_bytes = bytes;
276                        break
277                    }
278                    Err(err) => return Err(err),
279                };
280
281                let block = SealedBlock::seal_slow(block);
282
283                // Validate standalone header
284                self.consensus.validate_header(block.sealed_header())?;
285                if let Some(parent) = &parent_header {
286                    self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
287                    parent_header = Some(block.sealed_header().clone());
288                }
289
290                // Validate block against header
291                self.consensus.validate_block_pre_execution(&block)?;
292
293                // add to the internal maps
294                let block_hash = block.hash();
295                let block_number = block.number();
296                let (header, body) = block.split_sealed_header_body();
297                headers.insert(block_number, header.unseal());
298                hash_to_number.insert(block_hash, block_number);
299                bodies.insert(block_hash, body);
300
301                if log_interval == 0 {
302                    trace!(target: "downloaders::file",
303                        block_number,
304                        "read first block"
305                    );
306                    log_interval_start_block = block_number;
307                } else if log_interval % 100_000 == 0 {
308                    trace!(target: "downloaders::file",
309                        blocks=?log_interval_start_block..=block_number,
310                        "read blocks from file"
311                    );
312                    log_interval_start_block = block_number + 1;
313                }
314                log_interval += 1;
315            }
316
317            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
318
319            Ok(DecodedFileChunk {
320                file_client: FileClient { headers, hash_to_number, bodies },
321                remaining_bytes,
322                highest_block: None,
323            })
324        }
325    }
326}
327
328impl<B: FullBlock> HeadersClient for FileClient<B> {
329    type Header = B::Header;
330    type Output = HeadersFut<B::Header>;
331
332    fn get_headers_with_priority(
333        &self,
334        request: HeadersRequest,
335        _priority: Priority,
336    ) -> Self::Output {
337        // this just searches the buffer, and fails if it can't find the header
338        let mut headers = Vec::new();
339        trace!(target: "downloaders::file", request=?request, "Getting headers");
340
341        let start_num = match request.start {
342            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
343                Some(num) => *num,
344                None => {
345                    warn!(%hash, "Could not find starting block number for requested header hash");
346                    return Box::pin(async move { Err(RequestError::BadResponse) })
347                }
348            },
349            BlockHashOrNumber::Number(num) => num,
350        };
351
352        let range = if request.limit == 1 {
353            Either::Left(start_num..start_num + 1)
354        } else {
355            match request.direction {
356                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
357                HeadersDirection::Falling => {
358                    Either::Right((start_num - request.limit + 1..=start_num).rev())
359                }
360            }
361        };
362
363        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
364
365        for block_number in range {
366            match self.headers.get(&block_number).cloned() {
367                Some(header) => headers.push(header),
368                None => {
369                    warn!(number=%block_number, "Could not find header");
370                    return Box::pin(async move { Err(RequestError::BadResponse) })
371                }
372            }
373        }
374
375        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
376    }
377}
378
379impl<B: FullBlock> BodiesClient for FileClient<B> {
380    type Body = B::Body;
381    type Output = BodiesFut<B::Body>;
382
383    fn get_block_bodies_with_priority_and_range_hint(
384        &self,
385        hashes: Vec<B256>,
386        _priority: Priority,
387        _range_hint: Option<RangeInclusive<u64>>,
388    ) -> Self::Output {
389        // this just searches the buffer, and fails if it can't find the block
390        let mut bodies = Vec::new();
391
392        // check if any are an error
393        // could unwrap here
394        for hash in hashes {
395            match self.bodies.get(&hash).cloned() {
396                Some(body) => bodies.push(body),
397                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
398            }
399        }
400
401        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
402    }
403}
404
405impl<B: FullBlock> DownloadClient for FileClient<B> {
406    fn report_bad_message(&self, _peer_id: PeerId) {
407        trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
408        // noop
409    }
410
411    fn num_connected_peers(&self) -> usize {
412        // no such thing as connected peers when we are just using a file
413        1
414    }
415}
416
417impl<B: FullBlock> BlockClient for FileClient<B> {
418    type Block = B;
419}
420
421/// File reader type for handling different compression formats.
422#[derive(Debug)]
423enum FileReader {
424    /// Regular uncompressed file with remaining byte tracking.
425    Plain { file: File, remaining_bytes: u64 },
426    /// Gzip compressed file.
427    Gzip(GzipDecoder<BufReader<File>>),
428}
429
430impl FileReader {
431    /// Read some data into the provided buffer, returning the number of bytes read.
432    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
433        match self {
434            Self::Plain { file, .. } => file.read(buf).await,
435            Self::Gzip(decoder) => decoder.read(buf).await,
436        }
437    }
438
439    /// Read next chunk from file. Returns the number of bytes read for plain files,
440    /// or a boolean indicating if data is available for gzip files.
441    async fn read_next_chunk(
442        &mut self,
443        chunk: &mut Vec<u8>,
444        chunk_byte_len: u64,
445    ) -> Result<Option<u64>, FileClientError> {
446        match self {
447            Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
448            Self::Gzip(_) => {
449                Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
450                    .then_some(chunk.len() as u64))
451            }
452        }
453    }
454
455    async fn read_plain_chunk(
456        &mut self,
457        chunk: &mut Vec<u8>,
458        chunk_byte_len: u64,
459    ) -> Result<Option<u64>, FileClientError> {
460        let Self::Plain { file, remaining_bytes } = self else {
461            unreachable!("read_plain_chunk should only be called on Plain variant")
462        };
463
464        if *remaining_bytes == 0 && chunk.is_empty() {
465            // eof
466            return Ok(None)
467        }
468
469        let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
470        let old_bytes_len = chunk.len() as u64;
471
472        // calculate reserved space in chunk
473        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
474
475        // read new bytes from file
476        let prev_read_bytes_len = chunk.len();
477        chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
478        let reader = &mut chunk[prev_read_bytes_len..];
479
480        // actual bytes that have been read
481        let new_read_bytes_len = file.read_exact(reader).await? as u64;
482        let next_chunk_byte_len = chunk.len();
483
484        // update remaining file length
485        *remaining_bytes -= new_read_bytes_len;
486
487        debug!(target: "downloaders::file",
488            max_chunk_byte_len=chunk_byte_len,
489            prev_read_bytes_len,
490            new_read_bytes_target_len,
491            new_read_bytes_len,
492            next_chunk_byte_len,
493            remaining_file_byte_len=*remaining_bytes,
494            "new bytes were read from file"
495        );
496
497        Ok(Some(next_chunk_byte_len as u64))
498    }
499
500    /// Read next chunk from gzipped file.
501    async fn read_gzip_chunk(
502        &mut self,
503        chunk: &mut Vec<u8>,
504        chunk_byte_len: u64,
505    ) -> Result<bool, FileClientError> {
506        let mut buffer = vec![0u8; 64 * 1024];
507        loop {
508            if chunk.len() >= chunk_byte_len as usize {
509                return Ok(true)
510            }
511
512            match self.read(&mut buffer).await {
513                Ok(0) => return Ok(!chunk.is_empty()),
514                Ok(n) => {
515                    chunk.extend_from_slice(&buffer[..n]);
516                }
517                Err(e) => return Err(e.into()),
518            }
519        }
520    }
521}
522
523/// Chunks file into several [`FileClient`]s.
524#[derive(Debug)]
525pub struct ChunkedFileReader {
526    /// File reader (either plain or gzip).
527    file: FileReader,
528    /// Bytes that have been read.
529    chunk: Vec<u8>,
530    /// Max bytes per chunk.
531    chunk_byte_len: u64,
532    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
533    /// with block number
534    highest_block: Option<u64>,
535}
536
537impl ChunkedFileReader {
538    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
539    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
540    /// Automatically detects gzip files by extension (.gz, .gzip).
541    pub async fn new<P: AsRef<Path>>(
542        path: P,
543        chunk_byte_len: Option<u64>,
544    ) -> Result<Self, FileClientError> {
545        let path = path.as_ref();
546        let file = File::open(path).await?;
547        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
548
549        Self::from_file(
550            file,
551            chunk_byte_len,
552            path.extension()
553                .and_then(|ext| ext.to_str())
554                .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
555        )
556        .await
557    }
558
559    /// Opens the file to import from given path. Returns a new instance.
560    pub async fn from_file(
561        file: File,
562        chunk_byte_len: u64,
563        is_gzip: bool,
564    ) -> Result<Self, FileClientError> {
565        let file_reader = if is_gzip {
566            FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
567        } else {
568            let remaining_bytes = file.metadata().await?.len();
569            FileReader::Plain { file, remaining_bytes }
570        };
571
572        Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
573    }
574
575    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
576    /// chunk to read.
577    async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
578        self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
579    }
580
581    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
582    ///
583    /// For gzipped files, this method accumulates data until at least `chunk_byte_len` bytes
584    /// are available before processing. For plain files, it uses the original chunking logic.
585    pub async fn next_chunk<B: FullBlock>(
586        &mut self,
587        consensus: Arc<dyn Consensus<B>>,
588        parent_header: Option<SealedHeader<B::Header>>,
589    ) -> Result<Option<FileClient<B>>, FileClientError> {
590        let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
591
592        // make new file client from chunk
593        let DecodedFileChunk { file_client, remaining_bytes, .. } =
594            FileClientBuilder { consensus, parent_header }
595                .build(&self.chunk[..], chunk_len)
596                .await?;
597
598        // save left over bytes
599        self.chunk = remaining_bytes;
600
601        Ok(Some(file_client))
602    }
603
604    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
605    pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
606    where
607        T: FromReceiptReader,
608    {
609        let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
610            T::Error::from(match e {
611                FileClientError::Io(io_err) => io_err,
612                _ => io::Error::other(e.to_string()),
613            })
614        })?
615        else {
616            return Ok(None)
617        };
618
619        // make new file client from chunk
620        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
621            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
622                .await?;
623
624        // save left over bytes
625        self.chunk = remaining_bytes;
626        // update highest block
627        self.highest_block = highest_block;
628
629        Ok(Some(file_client))
630    }
631}
632
633/// Constructs a file client from a reader.
634pub trait FromReader {
635    /// Error returned by file client type.
636    type Error: From<io::Error>;
637
638    /// Output returned by file client type.
639    type Output;
640
641    /// Returns a file client
642    fn build<R>(
643        &self,
644        reader: R,
645        num_bytes: u64,
646    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
647    where
648        Self: Sized,
649        R: AsyncReadExt + Unpin;
650}
651
652/// Output from decoding a file chunk with [`FromReader::build`].
653#[derive(Debug)]
654pub struct DecodedFileChunk<T> {
655    /// File client, i.e. the decoded part of chunk.
656    pub file_client: T,
657    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
658    pub remaining_bytes: Vec<u8>,
659    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
660    /// block number, like receipts.
661    pub highest_block: Option<u64>,
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use crate::{
668        bodies::{
669            bodies::BodiesDownloaderBuilder,
670            test_utils::{insert_headers, zip_blocks},
671        },
672        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
673        test_utils::{generate_bodies, generate_bodies_file},
674    };
675    use assert_matches::assert_matches;
676    use async_compression::tokio::write::GzipEncoder;
677    use futures_util::stream::StreamExt;
678    use rand::Rng;
679    use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
680    use reth_ethereum_primitives::Block;
681    use reth_network_p2p::{
682        bodies::downloader::BodyDownloader,
683        headers::downloader::{HeaderDownloader, SyncTarget},
684    };
685    use reth_provider::test_utils::create_test_provider_factory;
686    use std::sync::Arc;
687    use tokio::{
688        fs::File,
689        io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
690    };
691
692    #[tokio::test]
693    async fn streams_bodies_from_buffer() {
694        // Generate some random blocks
695        let factory = create_test_provider_factory();
696        let (headers, mut bodies) = generate_bodies(0..=19);
697
698        insert_headers(&factory, &headers);
699
700        // create an empty file
701        let file = tempfile::tempfile().unwrap();
702
703        let client: Arc<FileClient<Block>> = Arc::new(
704            FileClient::from_file(file.into(), NoopConsensus::arc())
705                .await
706                .unwrap()
707                .with_bodies(bodies.clone().into_iter().collect()),
708        );
709        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
710            client.clone(),
711            Arc::new(TestConsensus::default()),
712            factory,
713        );
714        downloader.set_download_range(0..=19).expect("failed to set download range");
715
716        assert_matches!(
717            downloader.next().await,
718            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
719        );
720    }
721
722    #[tokio::test]
723    async fn download_headers_at_fork_head() {
724        reth_tracing::init_test_tracing();
725
726        let p3 = SealedHeader::default();
727        let p2 = child_header(&p3);
728        let p1 = child_header(&p2);
729        let p0 = child_header(&p1);
730
731        let file = tempfile::tempfile().unwrap();
732        let client: Arc<FileClient<Block>> = Arc::new(
733            FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
734                HashMap::from([
735                    (0u64, p0.clone_header()),
736                    (1, p1.clone_header()),
737                    (2, p2.clone_header()),
738                    (3, p3.clone_header()),
739                ]),
740            ),
741        );
742
743        let mut downloader = ReverseHeadersDownloaderBuilder::default()
744            .stream_batch_size(3)
745            .request_limit(3)
746            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
747        downloader.update_local_head(p3.clone());
748        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
749
750        let headers = downloader.next().await.unwrap();
751        assert_eq!(headers.unwrap(), vec![p0, p1, p2]);
752        assert!(downloader.next().await.is_none());
753        assert!(downloader.next().await.is_none());
754    }
755
756    #[tokio::test]
757    async fn test_download_headers_from_file() {
758        reth_tracing::init_test_tracing();
759
760        // Generate some random blocks
761        let (file, headers, _) = generate_bodies_file(0..=19).await;
762        // now try to read them back
763        let client: Arc<FileClient<Block>> =
764            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
765
766        // construct headers downloader and use first header
767        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
768            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
769        header_downloader.update_local_head(headers.first().unwrap().clone());
770        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
771
772        // get headers first
773        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
774
775        // reverse to make sure it's in the right order before comparing
776        downloaded_headers.reverse();
777
778        // the first header is not included in the response
779        assert_eq!(downloaded_headers, headers[1..]);
780    }
781
782    #[tokio::test]
783    async fn test_download_bodies_from_file() {
784        // Generate some random blocks
785        let factory = create_test_provider_factory();
786        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
787
788        // now try to read them back
789        let client: Arc<FileClient<Block>> =
790            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
791
792        // insert headers in db for the bodies downloader
793        insert_headers(&factory, &headers);
794
795        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
796            client.clone(),
797            Arc::new(TestConsensus::default()),
798            factory,
799        );
800        downloader.set_download_range(0..=19).expect("failed to set download range");
801
802        assert_matches!(
803            downloader.next().await,
804            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
805        );
806    }
807
808    #[tokio::test]
809    async fn test_chunk_download_headers_from_file() {
810        reth_tracing::init_test_tracing();
811
812        // Generate some random blocks
813        let (file, headers, _) = generate_bodies_file(0..=14).await;
814
815        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
816        // one block will be read
817        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
818        trace!(target: "downloaders::file::test", chunk_byte_len);
819
820        // init reader
821        let mut reader =
822            ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
823
824        let mut downloaded_headers: Vec<SealedHeader> = vec![];
825
826        let mut local_header = headers.first().unwrap().clone();
827
828        // test
829        while let Some(client) =
830            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
831        {
832            let sync_target = client.tip_header().unwrap();
833
834            let sync_target_hash = sync_target.hash();
835
836            // construct headers downloader and use first header
837            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
838                .build(Arc::new(client), Arc::new(TestConsensus::default()));
839            header_downloader.update_local_head(local_header.clone());
840            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
841
842            // get headers first
843            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
844
845            // export new local header to outer scope
846            local_header = sync_target;
847
848            // reverse to make sure it's in the right order before comparing
849            downloaded_headers_chunk.reverse();
850            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
851        }
852
853        // the first header is not included in the response
854        assert_eq!(headers[1..], downloaded_headers);
855    }
856
857    #[tokio::test]
858    async fn test_chunk_download_headers_from_gzip_file() {
859        reth_tracing::init_test_tracing();
860
861        // Generate some random blocks
862        let (file, headers, _) = generate_bodies_file(0..=14).await;
863
864        // Create a gzipped version of the file
865        let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
866        let gzip_path = gzip_temp_file.path().to_owned();
867        drop(gzip_temp_file); // Close the file so we can write to it
868
869        // Read original file content first
870        let mut original_file = file;
871        original_file.seek(SeekFrom::Start(0)).await.unwrap();
872        let mut original_content = Vec::new();
873        original_file.read_to_end(&mut original_content).await.unwrap();
874
875        let mut gzip_file = File::create(&gzip_path).await.unwrap();
876        let mut encoder = GzipEncoder::new(&mut gzip_file);
877
878        // Write the original content through the gzip encoder
879        encoder.write_all(&original_content).await.unwrap();
880        encoder.shutdown().await.unwrap();
881        drop(gzip_file);
882
883        // Reopen the gzipped file for reading
884        let gzip_file = File::open(&gzip_path).await.unwrap();
885
886        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
887        // one block will be read
888        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
889        trace!(target: "downloaders::file::test", chunk_byte_len);
890
891        // init reader with gzip=true
892        let mut reader =
893            ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
894
895        let mut downloaded_headers: Vec<SealedHeader> = vec![];
896
897        let mut local_header = headers.first().unwrap().clone();
898
899        // test
900        while let Some(client) =
901            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
902        {
903            if client.headers_len() == 0 {
904                continue;
905            }
906
907            let sync_target = client.tip_header().expect("tip_header should not be None");
908
909            let sync_target_hash = sync_target.hash();
910
911            // construct headers downloader and use first header
912            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
913                .build(Arc::new(client), Arc::new(TestConsensus::default()));
914            header_downloader.update_local_head(local_header.clone());
915            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
916
917            // get headers first
918            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
919
920            // export new local header to outer scope
921            local_header = sync_target;
922
923            // reverse to make sure it's in the right order before comparing
924            downloaded_headers_chunk.reverse();
925            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
926        }
927
928        // the first header is not included in the response
929        assert_eq!(headers[1..], downloaded_headers);
930    }
931}