reth_downloaders/
file_client.rs

1use alloy_consensus::BlockHeader;
2use alloy_eips::BlockHashOrNumber;
3use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
4use async_compression::tokio::bufread::GzipDecoder;
5use futures::Future;
6use itertools::Either;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::RequestError,
12    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13    priority::Priority,
14    BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
18use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
19use thiserror::Error;
20use tokio::{
21    fs::File,
22    io::{AsyncReadExt, BufReader},
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::FramedRead;
26use tracing::{debug, trace, warn};
27
28use super::file_codec::BlockFileCodec;
29use crate::receipt_file_client::FromReceiptReader;
30
31/// Default byte length of chunk to read from chain file.
32///
33/// Default is 1 GB.
34pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
35
36/// Front-end API for fetching chain data from a file.
37///
38/// Blocks are assumed to be written one after another in a file, as rlp bytes.
39///
40/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
41/// rlp(block1) || rlp(block2) || rlp(block3)
42///
43/// Blocks are assumed to have populated transactions, so reading headers will also buffer
44/// transactions in memory for use in the bodies stage.
45///
46/// This reads the entire file into memory, so it is not suitable for large files.
47#[derive(Debug, Clone)]
48pub struct FileClient<B: Block> {
49    /// The buffered headers retrieved when fetching new bodies.
50    headers: HashMap<BlockNumber, B::Header>,
51
52    /// A mapping between block hash and number.
53    hash_to_number: HashMap<BlockHash, BlockNumber>,
54
55    /// The buffered bodies retrieved when fetching new headers.
56    bodies: HashMap<BlockHash, B::Body>,
57}
58
59/// An error that can occur when constructing and using a [`FileClient`].
60#[derive(Debug, Error)]
61pub enum FileClientError {
62    /// An error occurred when validating a header from file.
63    #[error(transparent)]
64    Consensus(#[from] ConsensusError),
65
66    /// An error occurred when opening or reading the file.
67    #[error(transparent)]
68    Io(#[from] std::io::Error),
69
70    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
71    #[error("{0}")]
72    Rlp(alloy_rlp::Error, Vec<u8>),
73
74    /// Custom error message.
75    #[error("{0}")]
76    Custom(&'static str),
77}
78
79impl From<&'static str> for FileClientError {
80    fn from(value: &'static str) -> Self {
81        Self::Custom(value)
82    }
83}
84
85impl<B: FullBlock> FileClient<B> {
86    /// Create a new file client from a file path.
87    pub async fn new<P: AsRef<Path>>(
88        path: P,
89        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
90    ) -> Result<Self, FileClientError> {
91        let file = File::open(path).await?;
92        Self::from_file(file, consensus).await
93    }
94
95    /// Initialize the [`FileClient`] with a file directly.
96    pub(crate) async fn from_file(
97        mut file: File,
98        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
99    ) -> Result<Self, FileClientError> {
100        // get file len from metadata before reading
101        let metadata = file.metadata().await?;
102        let file_len = metadata.len();
103
104        let mut reader = vec![];
105        file.read_to_end(&mut reader).await?;
106
107        Ok(FileClientBuilder { consensus, parent_header: None }
108            .build(&reader[..], file_len)
109            .await?
110            .file_client)
111    }
112
113    /// Get the tip hash of the chain.
114    pub fn tip(&self) -> Option<B256> {
115        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
116    }
117
118    /// Get the start hash of the chain.
119    pub fn start(&self) -> Option<B256> {
120        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
121    }
122
123    /// Returns the highest block number of this client has or `None` if empty
124    pub fn max_block(&self) -> Option<u64> {
125        self.headers.keys().max().copied()
126    }
127
128    /// Returns the lowest block number of this client has or `None` if empty
129    pub fn min_block(&self) -> Option<u64> {
130        self.headers.keys().min().copied()
131    }
132
133    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
134    /// before returning.
135    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
136        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
137    }
138
139    /// Returns true if all blocks are canonical (no gaps)
140    pub fn has_canonical_blocks(&self) -> bool {
141        if self.headers.is_empty() {
142            return true
143        }
144        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
145        nums.sort_unstable();
146        let mut iter = nums.into_iter();
147        let mut lowest = iter.next().expect("not empty");
148        for next in iter {
149            if next != lowest + 1 {
150                return false
151            }
152            lowest = next;
153        }
154        true
155    }
156
157    /// Use the provided bodies as the file client's block body buffer.
158    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
159        self.bodies = bodies;
160        self
161    }
162
163    /// Use the provided headers as the file client's block body buffer.
164    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
165        self.headers = headers;
166        for (number, header) in &self.headers {
167            self.hash_to_number.insert(header.hash_slow(), *number);
168        }
169        self
170    }
171
172    /// Returns the current number of headers in the client.
173    pub fn headers_len(&self) -> usize {
174        self.headers.len()
175    }
176
177    /// Returns the current number of bodies in the client.
178    pub fn bodies_len(&self) -> usize {
179        self.bodies.len()
180    }
181
182    /// Returns an iterator over headers in the client.
183    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
184        self.headers.values()
185    }
186
187    /// Returns a mutable iterator over bodies in the client.
188    ///
189    /// Panics, if file client headers and bodies are not mapping 1-1.
190    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
191        let bodies = &mut self.bodies;
192        let numbers = &self.hash_to_number;
193        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
194    }
195
196    /// Returns the current number of transactions in the client.
197    pub fn total_transactions(&self) -> usize {
198        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
199    }
200}
201
202struct FileClientBuilder<B: Block> {
203    pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
204    pub parent_header: Option<SealedHeader<B::Header>>,
205}
206
207impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
208    for FileClientBuilder<B>
209{
210    type Error = FileClientError;
211    type Output = FileClient<B>;
212
213    /// Initialize the [`FileClient`] from bytes that have been read from file.
214    fn build<R>(
215        &self,
216        reader: R,
217        num_bytes: u64,
218    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
219    where
220        R: AsyncReadExt + Unpin,
221    {
222        let mut headers = HashMap::default();
223        let mut hash_to_number = HashMap::default();
224        let mut bodies = HashMap::default();
225
226        // use with_capacity to make sure the internal buffer contains the entire chunk
227        let mut stream =
228            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
229
230        trace!(target: "downloaders::file",
231            target_num_bytes=num_bytes,
232            capacity=stream.read_buffer().capacity(),
233            "init decode stream"
234        );
235
236        let mut remaining_bytes = vec![];
237
238        let mut log_interval = 0;
239        let mut log_interval_start_block = 0;
240
241        let mut parent_header = self.parent_header.clone();
242
243        async move {
244            while let Some(block_res) = stream.next().await {
245                let block = match block_res {
246                    Ok(block) => block,
247                    Err(FileClientError::Rlp(err, bytes)) => {
248                        trace!(target: "downloaders::file",
249                            %err,
250                            bytes_len=bytes.len(),
251                            "partial block returned from decoding chunk"
252                        );
253                        remaining_bytes = bytes;
254                        break
255                    }
256                    Err(err) => return Err(err),
257                };
258
259                let block = SealedBlock::seal_slow(block);
260
261                // Validate standalone header
262                self.consensus.validate_header(block.sealed_header())?;
263                if let Some(parent) = &parent_header {
264                    self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
265                    parent_header = Some(block.sealed_header().clone());
266                }
267
268                // Validate block against header
269                self.consensus.validate_block_pre_execution(&block)?;
270
271                // add to the internal maps
272                let block_hash = block.hash();
273                let block_number = block.number();
274                let (header, body) = block.split_sealed_header_body();
275                headers.insert(block_number, header.unseal());
276                hash_to_number.insert(block_hash, block_number);
277                bodies.insert(block_hash, body);
278
279                if log_interval == 0 {
280                    trace!(target: "downloaders::file",
281                        block_number,
282                        "read first block"
283                    );
284                    log_interval_start_block = block_number;
285                } else if log_interval % 100_000 == 0 {
286                    trace!(target: "downloaders::file",
287                        blocks=?log_interval_start_block..=block_number,
288                        "read blocks from file"
289                    );
290                    log_interval_start_block = block_number + 1;
291                }
292                log_interval += 1;
293            }
294
295            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
296
297            Ok(DecodedFileChunk {
298                file_client: FileClient { headers, hash_to_number, bodies },
299                remaining_bytes,
300                highest_block: None,
301            })
302        }
303    }
304}
305
306impl<B: FullBlock> HeadersClient for FileClient<B> {
307    type Header = B::Header;
308    type Output = HeadersFut<B::Header>;
309
310    fn get_headers_with_priority(
311        &self,
312        request: HeadersRequest,
313        _priority: Priority,
314    ) -> Self::Output {
315        // this just searches the buffer, and fails if it can't find the header
316        let mut headers = Vec::new();
317        trace!(target: "downloaders::file", request=?request, "Getting headers");
318
319        let start_num = match request.start {
320            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
321                Some(num) => *num,
322                None => {
323                    warn!(%hash, "Could not find starting block number for requested header hash");
324                    return Box::pin(async move { Err(RequestError::BadResponse) })
325                }
326            },
327            BlockHashOrNumber::Number(num) => num,
328        };
329
330        let range = if request.limit == 1 {
331            Either::Left(start_num..start_num + 1)
332        } else {
333            match request.direction {
334                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
335                HeadersDirection::Falling => {
336                    Either::Right((start_num - request.limit + 1..=start_num).rev())
337                }
338            }
339        };
340
341        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
342
343        for block_number in range {
344            match self.headers.get(&block_number).cloned() {
345                Some(header) => headers.push(header),
346                None => {
347                    warn!(number=%block_number, "Could not find header");
348                    return Box::pin(async move { Err(RequestError::BadResponse) })
349                }
350            }
351        }
352
353        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
354    }
355}
356
357impl<B: FullBlock> BodiesClient for FileClient<B> {
358    type Body = B::Body;
359    type Output = BodiesFut<B::Body>;
360
361    fn get_block_bodies_with_priority_and_range_hint(
362        &self,
363        hashes: Vec<B256>,
364        _priority: Priority,
365        _range_hint: Option<RangeInclusive<u64>>,
366    ) -> Self::Output {
367        // this just searches the buffer, and fails if it can't find the block
368        let mut bodies = Vec::new();
369
370        // check if any are an error
371        // could unwrap here
372        for hash in hashes {
373            match self.bodies.get(&hash).cloned() {
374                Some(body) => bodies.push(body),
375                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
376            }
377        }
378
379        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
380    }
381}
382
383impl<B: FullBlock> DownloadClient for FileClient<B> {
384    fn report_bad_message(&self, _peer_id: PeerId) {
385        trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
386        // noop
387    }
388
389    fn num_connected_peers(&self) -> usize {
390        // no such thing as connected peers when we are just using a file
391        1
392    }
393}
394
395impl<B: FullBlock> BlockClient for FileClient<B> {
396    type Block = B;
397}
398
399/// File reader type for handling different compression formats.
400#[derive(Debug)]
401enum FileReader {
402    /// Regular uncompressed file with remaining byte tracking.
403    Plain { file: File, remaining_bytes: u64 },
404    /// Gzip compressed file.
405    Gzip(GzipDecoder<BufReader<File>>),
406}
407
408impl FileReader {
409    /// Read some data into the provided buffer, returning the number of bytes read.
410    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
411        match self {
412            Self::Plain { file, .. } => file.read(buf).await,
413            Self::Gzip(decoder) => decoder.read(buf).await,
414        }
415    }
416
417    /// Read next chunk from file. Returns the number of bytes read for plain files,
418    /// or a boolean indicating if data is available for gzip files.
419    async fn read_next_chunk(
420        &mut self,
421        chunk: &mut Vec<u8>,
422        chunk_byte_len: u64,
423    ) -> Result<Option<u64>, FileClientError> {
424        match self {
425            Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
426            Self::Gzip(_) => {
427                Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
428                    .then_some(chunk.len() as u64))
429            }
430        }
431    }
432
433    async fn read_plain_chunk(
434        &mut self,
435        chunk: &mut Vec<u8>,
436        chunk_byte_len: u64,
437    ) -> Result<Option<u64>, FileClientError> {
438        let Self::Plain { file, remaining_bytes } = self else {
439            unreachable!("read_plain_chunk should only be called on Plain variant")
440        };
441
442        if *remaining_bytes == 0 && chunk.is_empty() {
443            // eof
444            return Ok(None)
445        }
446
447        let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
448        let old_bytes_len = chunk.len() as u64;
449
450        // calculate reserved space in chunk
451        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
452
453        // read new bytes from file
454        let prev_read_bytes_len = chunk.len();
455        chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
456        let reader = &mut chunk[prev_read_bytes_len..];
457
458        // actual bytes that have been read
459        let new_read_bytes_len = file.read_exact(reader).await? as u64;
460        let next_chunk_byte_len = chunk.len();
461
462        // update remaining file length
463        *remaining_bytes -= new_read_bytes_len;
464
465        debug!(target: "downloaders::file",
466            max_chunk_byte_len=chunk_byte_len,
467            prev_read_bytes_len,
468            new_read_bytes_target_len,
469            new_read_bytes_len,
470            next_chunk_byte_len,
471            remaining_file_byte_len=*remaining_bytes,
472            "new bytes were read from file"
473        );
474
475        Ok(Some(next_chunk_byte_len as u64))
476    }
477
478    /// Read next chunk from gzipped file.
479    async fn read_gzip_chunk(
480        &mut self,
481        chunk: &mut Vec<u8>,
482        chunk_byte_len: u64,
483    ) -> Result<bool, FileClientError> {
484        loop {
485            if chunk.len() >= chunk_byte_len as usize {
486                return Ok(true)
487            }
488
489            let mut buffer = vec![0u8; 64 * 1024];
490
491            match self.read(&mut buffer).await {
492                Ok(0) => return Ok(!chunk.is_empty()),
493                Ok(n) => {
494                    buffer.truncate(n);
495                    chunk.extend_from_slice(&buffer);
496                }
497                Err(e) => return Err(e.into()),
498            }
499        }
500    }
501}
502
503/// Chunks file into several [`FileClient`]s.
504#[derive(Debug)]
505pub struct ChunkedFileReader {
506    /// File reader (either plain or gzip).
507    file: FileReader,
508    /// Bytes that have been read.
509    chunk: Vec<u8>,
510    /// Max bytes per chunk.
511    chunk_byte_len: u64,
512    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
513    /// with block number
514    highest_block: Option<u64>,
515}
516
517impl ChunkedFileReader {
518    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
519    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
520    /// Automatically detects gzip files by extension (.gz, .gzip).
521    pub async fn new<P: AsRef<Path>>(
522        path: P,
523        chunk_byte_len: Option<u64>,
524    ) -> Result<Self, FileClientError> {
525        let path = path.as_ref();
526        let file = File::open(path).await?;
527        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
528
529        Self::from_file(
530            file,
531            chunk_byte_len,
532            path.extension()
533                .and_then(|ext| ext.to_str())
534                .is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
535        )
536        .await
537    }
538
539    /// Opens the file to import from given path. Returns a new instance.
540    pub async fn from_file(
541        file: File,
542        chunk_byte_len: u64,
543        is_gzip: bool,
544    ) -> Result<Self, FileClientError> {
545        let file_reader = if is_gzip {
546            FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
547        } else {
548            let remaining_bytes = file.metadata().await?.len();
549            FileReader::Plain { file, remaining_bytes }
550        };
551
552        Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
553    }
554
555    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
556    /// chunk to read.
557    async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
558        self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
559    }
560
561    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
562    ///
563    /// For gzipped files, this method accumulates data until at least `chunk_byte_len` bytes
564    /// are available before processing. For plain files, it uses the original chunking logic.
565    pub async fn next_chunk<B: FullBlock>(
566        &mut self,
567        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
568        parent_header: Option<SealedHeader<B::Header>>,
569    ) -> Result<Option<FileClient<B>>, FileClientError> {
570        let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
571
572        // make new file client from chunk
573        let DecodedFileChunk { file_client, remaining_bytes, .. } =
574            FileClientBuilder { consensus, parent_header }
575                .build(&self.chunk[..], chunk_len)
576                .await?;
577
578        // save left over bytes
579        self.chunk = remaining_bytes;
580
581        Ok(Some(file_client))
582    }
583
584    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
585    pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
586    where
587        T: FromReceiptReader,
588    {
589        let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
590            T::Error::from(match e {
591                FileClientError::Io(io_err) => io_err,
592                _ => io::Error::other(e.to_string()),
593            })
594        })?
595        else {
596            return Ok(None)
597        };
598
599        // make new file client from chunk
600        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
601            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
602                .await?;
603
604        // save left over bytes
605        self.chunk = remaining_bytes;
606        // update highest block
607        self.highest_block = highest_block;
608
609        Ok(Some(file_client))
610    }
611}
612
613/// Constructs a file client from a reader.
614pub trait FromReader {
615    /// Error returned by file client type.
616    type Error: From<io::Error>;
617
618    /// Output returned by file client type.
619    type Output;
620
621    /// Returns a file client
622    fn build<R>(
623        &self,
624        reader: R,
625        num_bytes: u64,
626    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
627    where
628        Self: Sized,
629        R: AsyncReadExt + Unpin;
630}
631
632/// Output from decoding a file chunk with [`FromReader::build`].
633#[derive(Debug)]
634pub struct DecodedFileChunk<T> {
635    /// File client, i.e. the decoded part of chunk.
636    pub file_client: T,
637    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
638    pub remaining_bytes: Vec<u8>,
639    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
640    /// block number, like receipts.
641    pub highest_block: Option<u64>,
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use crate::{
648        bodies::{
649            bodies::BodiesDownloaderBuilder,
650            test_utils::{insert_headers, zip_blocks},
651        },
652        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
653        test_utils::{generate_bodies, generate_bodies_file},
654    };
655    use assert_matches::assert_matches;
656    use async_compression::tokio::write::GzipEncoder;
657    use futures_util::stream::StreamExt;
658    use rand::Rng;
659    use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
660    use reth_ethereum_primitives::Block;
661    use reth_network_p2p::{
662        bodies::downloader::BodyDownloader,
663        headers::downloader::{HeaderDownloader, SyncTarget},
664    };
665    use reth_provider::test_utils::create_test_provider_factory;
666    use std::sync::Arc;
667    use tokio::{
668        fs::File,
669        io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
670    };
671
672    #[tokio::test]
673    async fn streams_bodies_from_buffer() {
674        // Generate some random blocks
675        let factory = create_test_provider_factory();
676        let (headers, mut bodies) = generate_bodies(0..=19);
677
678        insert_headers(&factory, &headers);
679
680        // create an empty file
681        let file = tempfile::tempfile().unwrap();
682
683        let client: Arc<FileClient<Block>> = Arc::new(
684            FileClient::from_file(file.into(), NoopConsensus::arc())
685                .await
686                .unwrap()
687                .with_bodies(bodies.clone()),
688        );
689        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
690            client.clone(),
691            Arc::new(TestConsensus::default()),
692            factory,
693        );
694        downloader.set_download_range(0..=19).expect("failed to set download range");
695
696        assert_matches!(
697            downloader.next().await,
698            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
699        );
700    }
701
702    #[tokio::test]
703    async fn download_headers_at_fork_head() {
704        reth_tracing::init_test_tracing();
705
706        let p3 = SealedHeader::default();
707        let p2 = child_header(&p3);
708        let p1 = child_header(&p2);
709        let p0 = child_header(&p1);
710
711        let file = tempfile::tempfile().unwrap();
712        let client: Arc<FileClient<Block>> = Arc::new(
713            FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
714                HashMap::from([
715                    (0u64, p0.clone_header()),
716                    (1, p1.clone_header()),
717                    (2, p2.clone_header()),
718                    (3, p3.clone_header()),
719                ]),
720            ),
721        );
722
723        let mut downloader = ReverseHeadersDownloaderBuilder::default()
724            .stream_batch_size(3)
725            .request_limit(3)
726            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
727        downloader.update_local_head(p3.clone());
728        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
729
730        let headers = downloader.next().await.unwrap();
731        assert_eq!(headers, Ok(vec![p0, p1, p2]));
732        assert!(downloader.next().await.is_none());
733        assert!(downloader.next().await.is_none());
734    }
735
736    #[tokio::test]
737    async fn test_download_headers_from_file() {
738        reth_tracing::init_test_tracing();
739
740        // Generate some random blocks
741        let (file, headers, _) = generate_bodies_file(0..=19).await;
742        // now try to read them back
743        let client: Arc<FileClient<Block>> =
744            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
745
746        // construct headers downloader and use first header
747        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
748            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
749        header_downloader.update_local_head(headers.first().unwrap().clone());
750        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
751
752        // get headers first
753        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
754
755        // reverse to make sure it's in the right order before comparing
756        downloaded_headers.reverse();
757
758        // the first header is not included in the response
759        assert_eq!(downloaded_headers, headers[1..]);
760    }
761
762    #[tokio::test]
763    async fn test_download_bodies_from_file() {
764        // Generate some random blocks
765        let factory = create_test_provider_factory();
766        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
767
768        // now try to read them back
769        let client: Arc<FileClient<Block>> =
770            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
771
772        // insert headers in db for the bodies downloader
773        insert_headers(&factory, &headers);
774
775        let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
776            client.clone(),
777            Arc::new(TestConsensus::default()),
778            factory,
779        );
780        downloader.set_download_range(0..=19).expect("failed to set download range");
781
782        assert_matches!(
783            downloader.next().await,
784            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
785        );
786    }
787
788    #[tokio::test]
789    async fn test_chunk_download_headers_from_file() {
790        reth_tracing::init_test_tracing();
791
792        // Generate some random blocks
793        let (file, headers, _) = generate_bodies_file(0..=14).await;
794
795        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
796        // one block will be read
797        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
798        trace!(target: "downloaders::file::test", chunk_byte_len);
799
800        // init reader
801        let mut reader =
802            ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
803
804        let mut downloaded_headers: Vec<SealedHeader> = vec![];
805
806        let mut local_header = headers.first().unwrap().clone();
807
808        // test
809        while let Some(client) =
810            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
811        {
812            let sync_target = client.tip_header().unwrap();
813
814            let sync_target_hash = sync_target.hash();
815
816            // construct headers downloader and use first header
817            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
818                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
819            header_downloader.update_local_head(local_header.clone());
820            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
821
822            // get headers first
823            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
824
825            // export new local header to outer scope
826            local_header = sync_target;
827
828            // reverse to make sure it's in the right order before comparing
829            downloaded_headers_chunk.reverse();
830            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
831        }
832
833        // the first header is not included in the response
834        assert_eq!(headers[1..], downloaded_headers);
835    }
836
837    #[tokio::test]
838    async fn test_chunk_download_headers_from_gzip_file() {
839        reth_tracing::init_test_tracing();
840
841        // Generate some random blocks
842        let (file, headers, _) = generate_bodies_file(0..=14).await;
843
844        // Create a gzipped version of the file
845        let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
846        let gzip_path = gzip_temp_file.path().to_owned();
847        drop(gzip_temp_file); // Close the file so we can write to it
848
849        // Read original file content first
850        let mut original_file = file;
851        original_file.seek(SeekFrom::Start(0)).await.unwrap();
852        let mut original_content = Vec::new();
853        original_file.read_to_end(&mut original_content).await.unwrap();
854
855        let mut gzip_file = File::create(&gzip_path).await.unwrap();
856        let mut encoder = GzipEncoder::new(&mut gzip_file);
857
858        // Write the original content through the gzip encoder
859        encoder.write_all(&original_content).await.unwrap();
860        encoder.shutdown().await.unwrap();
861        drop(gzip_file);
862
863        // Reopen the gzipped file for reading
864        let gzip_file = File::open(&gzip_path).await.unwrap();
865
866        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
867        // one block will be read
868        let chunk_byte_len = rand::rng().random_range(2000..=10_000);
869        trace!(target: "downloaders::file::test", chunk_byte_len);
870
871        // init reader with gzip=true
872        let mut reader =
873            ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
874
875        let mut downloaded_headers: Vec<SealedHeader> = vec![];
876
877        let mut local_header = headers.first().unwrap().clone();
878
879        // test
880        while let Some(client) =
881            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
882        {
883            if client.headers_len() == 0 {
884                continue;
885            }
886
887            let sync_target = client.tip_header().expect("tip_header should not be None");
888
889            let sync_target_hash = sync_target.hash();
890
891            // construct headers downloader and use first header
892            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
893                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
894            header_downloader.update_local_head(local_header.clone());
895            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
896
897            // get headers first
898            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
899
900            // export new local header to outer scope
901            local_header = sync_target;
902
903            // reverse to make sure it's in the right order before comparing
904            downloaded_headers_chunk.reverse();
905            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
906        }
907
908        // the first header is not included in the response
909        assert_eq!(headers[1..], downloaded_headers);
910    }
911}