reth_downloaders/
file_client.rs

1use std::{collections::HashMap, io, path::Path, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
6use futures::Future;
7use itertools::Either;
8use reth_consensus::{Consensus, ConsensusError};
9use reth_network_p2p::{
10    bodies::client::{BodiesClient, BodiesFut},
11    download::DownloadClient,
12    error::RequestError,
13    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
14    priority::Priority,
15    BlockClient,
16};
17use reth_network_peers::PeerId;
18use reth_primitives::{SealedBlock, SealedHeader};
19use reth_primitives_traits::{Block, BlockBody, FullBlock};
20use thiserror::Error;
21use tokio::{fs::File, io::AsyncReadExt};
22use tokio_stream::StreamExt;
23use tokio_util::codec::FramedRead;
24use tracing::{debug, trace, warn};
25
26use super::file_codec::BlockFileCodec;
27use crate::receipt_file_client::FromReceiptReader;
28
29/// Default byte length of chunk to read from chain file.
30///
31/// Default is 1 GB.
32pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
33
34/// Front-end API for fetching chain data from a file.
35///
36/// Blocks are assumed to be written one after another in a file, as rlp bytes.
37///
38/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
39/// rlp(block1) || rlp(block2) || rlp(block3)
40///
41/// Blocks are assumed to have populated transactions, so reading headers will also buffer
42/// transactions in memory for use in the bodies stage.
43///
44/// This reads the entire file into memory, so it is not suitable for large files.
45#[derive(Debug, Clone)]
46pub struct FileClient<B: Block = reth_primitives::Block> {
47    /// The buffered headers retrieved when fetching new bodies.
48    headers: HashMap<BlockNumber, B::Header>,
49
50    /// A mapping between block hash and number.
51    hash_to_number: HashMap<BlockHash, BlockNumber>,
52
53    /// The buffered bodies retrieved when fetching new headers.
54    bodies: HashMap<BlockHash, B::Body>,
55}
56
57/// An error that can occur when constructing and using a [`FileClient`].
58#[derive(Debug, Error)]
59pub enum FileClientError {
60    /// An error occurred when validating a header from file.
61    #[error(transparent)]
62    Consensus(#[from] ConsensusError),
63
64    /// An error occurred when opening or reading the file.
65    #[error(transparent)]
66    Io(#[from] std::io::Error),
67
68    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
69    #[error("{0}")]
70    Rlp(alloy_rlp::Error, Vec<u8>),
71
72    /// Custom error message.
73    #[error("{0}")]
74    Custom(&'static str),
75}
76
77impl From<&'static str> for FileClientError {
78    fn from(value: &'static str) -> Self {
79        Self::Custom(value)
80    }
81}
82
83impl<B: FullBlock> FileClient<B> {
84    /// Create a new file client from a file path.
85    pub async fn new<P: AsRef<Path>>(
86        path: P,
87        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
88    ) -> Result<Self, FileClientError> {
89        let file = File::open(path).await?;
90        Self::from_file(file, consensus).await
91    }
92
93    /// Initialize the [`FileClient`] with a file directly.
94    pub(crate) async fn from_file(
95        mut file: File,
96        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
97    ) -> Result<Self, FileClientError> {
98        // get file len from metadata before reading
99        let metadata = file.metadata().await?;
100        let file_len = metadata.len();
101
102        let mut reader = vec![];
103        file.read_to_end(&mut reader).await?;
104
105        Ok(FileClientBuilder { consensus, parent_header: None }
106            .build(&reader[..], file_len)
107            .await?
108            .file_client)
109    }
110
111    /// Get the tip hash of the chain.
112    pub fn tip(&self) -> Option<B256> {
113        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
114    }
115
116    /// Get the start hash of the chain.
117    pub fn start(&self) -> Option<B256> {
118        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
119    }
120
121    /// Returns the highest block number of this client has or `None` if empty
122    pub fn max_block(&self) -> Option<u64> {
123        self.headers.keys().max().copied()
124    }
125
126    /// Returns the lowest block number of this client has or `None` if empty
127    pub fn min_block(&self) -> Option<u64> {
128        self.headers.keys().min().copied()
129    }
130
131    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
132    /// before returning.
133    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
134        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal_slow(h.clone()))
135    }
136
137    /// Returns true if all blocks are canonical (no gaps)
138    pub fn has_canonical_blocks(&self) -> bool {
139        if self.headers.is_empty() {
140            return true
141        }
142        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
143        nums.sort_unstable();
144        let mut iter = nums.into_iter();
145        let mut lowest = iter.next().expect("not empty");
146        for next in iter {
147            if next != lowest + 1 {
148                return false
149            }
150            lowest = next;
151        }
152        true
153    }
154
155    /// Use the provided bodies as the file client's block body buffer.
156    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
157        self.bodies = bodies;
158        self
159    }
160
161    /// Use the provided headers as the file client's block body buffer.
162    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
163        self.headers = headers;
164        for (number, header) in &self.headers {
165            self.hash_to_number.insert(header.hash_slow(), *number);
166        }
167        self
168    }
169
170    /// Returns the current number of headers in the client.
171    pub fn headers_len(&self) -> usize {
172        self.headers.len()
173    }
174
175    /// Returns the current number of bodies in the client.
176    pub fn bodies_len(&self) -> usize {
177        self.bodies.len()
178    }
179
180    /// Returns an iterator over headers in the client.
181    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
182        self.headers.values()
183    }
184
185    /// Returns a mutable iterator over bodies in the client.
186    ///
187    /// Panics, if file client headers and bodies are not mapping 1-1.
188    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
189        let bodies = &mut self.bodies;
190        let numbers = &self.hash_to_number;
191        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
192    }
193
194    /// Returns the current number of transactions in the client.
195    pub fn total_transactions(&self) -> usize {
196        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
197    }
198}
199
200struct FileClientBuilder<B: Block = reth_primitives::Block> {
201    pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
202    pub parent_header: Option<SealedHeader<B::Header>>,
203}
204
205impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
206    for FileClientBuilder<B>
207{
208    type Error = FileClientError;
209    type Output = FileClient<B>;
210
211    /// Initialize the [`FileClient`] from bytes that have been read from file.
212    fn build<R>(
213        &self,
214        reader: R,
215        num_bytes: u64,
216    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
217    where
218        R: AsyncReadExt + Unpin,
219    {
220        let mut headers = HashMap::default();
221        let mut hash_to_number = HashMap::default();
222        let mut bodies = HashMap::default();
223
224        // use with_capacity to make sure the internal buffer contains the entire chunk
225        let mut stream =
226            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
227
228        trace!(target: "downloaders::file",
229            target_num_bytes=num_bytes,
230            capacity=stream.read_buffer().capacity(),
231            "init decode stream"
232        );
233
234        let mut remaining_bytes = vec![];
235
236        let mut log_interval = 0;
237        let mut log_interval_start_block = 0;
238
239        let mut parent_header = self.parent_header.clone();
240
241        async move {
242            while let Some(block_res) = stream.next().await {
243                let block = match block_res {
244                    Ok(block) => block,
245                    Err(FileClientError::Rlp(err, bytes)) => {
246                        trace!(target: "downloaders::file",
247                            %err,
248                            bytes_len=bytes.len(),
249                            "partial block returned from decoding chunk"
250                        );
251                        remaining_bytes = bytes;
252                        break
253                    }
254                    Err(err) => return Err(err),
255                };
256
257                let block = SealedBlock::seal_slow(block);
258
259                // Validate standalone header
260                self.consensus.validate_header(block.sealed_header())?;
261                if let Some(parent) = &parent_header {
262                    self.consensus.validate_header_against_parent(block.sealed_header(), parent)?;
263                    parent_header = Some(block.sealed_header().clone());
264                }
265
266                // Validate block against header
267                self.consensus.validate_block_pre_execution(&block)?;
268
269                // add to the internal maps
270                let block_hash = block.hash();
271                let block_number = block.number();
272                let (header, body) = block.split_sealed_header_body();
273                headers.insert(block_number, header.unseal());
274                hash_to_number.insert(block_hash, block_number);
275                bodies.insert(block_hash, body);
276
277                if log_interval == 0 {
278                    trace!(target: "downloaders::file",
279                        block_number,
280                        "read first block"
281                    );
282                    log_interval_start_block = block_number;
283                } else if log_interval % 100_000 == 0 {
284                    trace!(target: "downloaders::file",
285                        blocks=?log_interval_start_block..=block_number,
286                        "read blocks from file"
287                    );
288                    log_interval_start_block = block_number + 1;
289                }
290                log_interval += 1;
291            }
292
293            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
294
295            Ok(DecodedFileChunk {
296                file_client: FileClient { headers, hash_to_number, bodies },
297                remaining_bytes,
298                highest_block: None,
299            })
300        }
301    }
302}
303
304impl<B: FullBlock> HeadersClient for FileClient<B> {
305    type Header = B::Header;
306    type Output = HeadersFut<B::Header>;
307
308    fn get_headers_with_priority(
309        &self,
310        request: HeadersRequest,
311        _priority: Priority,
312    ) -> Self::Output {
313        // this just searches the buffer, and fails if it can't find the header
314        let mut headers = Vec::new();
315        trace!(target: "downloaders::file", request=?request, "Getting headers");
316
317        let start_num = match request.start {
318            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
319                Some(num) => *num,
320                None => {
321                    warn!(%hash, "Could not find starting block number for requested header hash");
322                    return Box::pin(async move { Err(RequestError::BadResponse) })
323                }
324            },
325            BlockHashOrNumber::Number(num) => num,
326        };
327
328        let range = if request.limit == 1 {
329            Either::Left(start_num..start_num + 1)
330        } else {
331            match request.direction {
332                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
333                HeadersDirection::Falling => {
334                    Either::Right((start_num - request.limit + 1..=start_num).rev())
335                }
336            }
337        };
338
339        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
340
341        for block_number in range {
342            match self.headers.get(&block_number).cloned() {
343                Some(header) => headers.push(header),
344                None => {
345                    warn!(number=%block_number, "Could not find header");
346                    return Box::pin(async move { Err(RequestError::BadResponse) })
347                }
348            }
349        }
350
351        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
352    }
353}
354
355impl<B: FullBlock> BodiesClient for FileClient<B> {
356    type Body = B::Body;
357    type Output = BodiesFut<B::Body>;
358
359    fn get_block_bodies_with_priority(
360        &self,
361        hashes: Vec<B256>,
362        _priority: Priority,
363    ) -> Self::Output {
364        // this just searches the buffer, and fails if it can't find the block
365        let mut bodies = Vec::new();
366
367        // check if any are an error
368        // could unwrap here
369        for hash in hashes {
370            match self.bodies.get(&hash).cloned() {
371                Some(body) => bodies.push(body),
372                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
373            }
374        }
375
376        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
377    }
378}
379
380impl<B: FullBlock> DownloadClient for FileClient<B> {
381    fn report_bad_message(&self, _peer_id: PeerId) {
382        trace!("Reported a bad message on a file client, the file may be corrupted or invalid");
383        // noop
384    }
385
386    fn num_connected_peers(&self) -> usize {
387        // no such thing as connected peers when we are just using a file
388        1
389    }
390}
391
392impl<B: FullBlock> BlockClient for FileClient<B> {
393    type Block = B;
394}
395
396/// Chunks file into several [`FileClient`]s.
397#[derive(Debug)]
398pub struct ChunkedFileReader {
399    /// File to read from.
400    file: File,
401    /// Current file byte length.
402    file_byte_len: u64,
403    /// Bytes that have been read.
404    chunk: Vec<u8>,
405    /// Max bytes per chunk.
406    chunk_byte_len: u64,
407    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
408    /// with block number
409    highest_block: Option<u64>,
410}
411
412impl ChunkedFileReader {
413    /// Returns the remaining file length.
414    pub const fn file_len(&self) -> u64 {
415        self.file_byte_len
416    }
417
418    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
419    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
420    pub async fn new<P: AsRef<Path>>(
421        path: P,
422        chunk_byte_len: Option<u64>,
423    ) -> Result<Self, FileClientError> {
424        let file = File::open(path).await?;
425        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
426
427        Self::from_file(file, chunk_byte_len).await
428    }
429
430    /// Opens the file to import from given path. Returns a new instance.
431    pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
432        // get file len from metadata before reading
433        let metadata = file.metadata().await?;
434        let file_byte_len = metadata.len();
435
436        Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
437    }
438
439    /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
440    /// length and the remaining file length.
441    fn chunk_len(&self) -> u64 {
442        let Self { chunk_byte_len, file_byte_len, .. } = *self;
443        let file_byte_len = file_byte_len + self.chunk.len() as u64;
444
445        if chunk_byte_len > file_byte_len {
446            // last chunk
447            file_byte_len
448        } else {
449            chunk_byte_len
450        }
451    }
452
453    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
454    /// chunk to read.
455    async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
456        if self.file_byte_len == 0 && self.chunk.is_empty() {
457            // eof
458            return Ok(None)
459        }
460
461        let chunk_target_len = self.chunk_len();
462        let old_bytes_len = self.chunk.len() as u64;
463
464        // calculate reserved space in chunk
465        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
466
467        // read new bytes from file
468        let prev_read_bytes_len = self.chunk.len();
469        self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
470        let reader = &mut self.chunk[prev_read_bytes_len..];
471
472        // actual bytes that have been read
473        let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
474        let next_chunk_byte_len = self.chunk.len();
475
476        // update remaining file length
477        self.file_byte_len -= new_read_bytes_len;
478
479        debug!(target: "downloaders::file",
480            max_chunk_byte_len=self.chunk_byte_len,
481            prev_read_bytes_len,
482            new_read_bytes_target_len,
483            new_read_bytes_len,
484            next_chunk_byte_len,
485            remaining_file_byte_len=self.file_byte_len,
486            "new bytes were read from file"
487        );
488
489        Ok(Some(next_chunk_byte_len as u64))
490    }
491
492    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
493    pub async fn next_chunk<B: FullBlock>(
494        &mut self,
495        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
496        parent_header: Option<SealedHeader<B::Header>>,
497    ) -> Result<Option<FileClient<B>>, FileClientError> {
498        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
499
500        // make new file client from chunk
501        let DecodedFileChunk { file_client, remaining_bytes, .. } =
502            FileClientBuilder { consensus, parent_header }
503                .build(&self.chunk[..], next_chunk_byte_len)
504                .await?;
505
506        // save left over bytes
507        self.chunk = remaining_bytes;
508
509        Ok(Some(file_client))
510    }
511
512    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
513    pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
514    where
515        T: FromReceiptReader,
516    {
517        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
518
519        // make new file client from chunk
520        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
521            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
522                .await?;
523
524        // save left over bytes
525        self.chunk = remaining_bytes;
526        // update highest block
527        self.highest_block = highest_block;
528
529        Ok(Some(file_client))
530    }
531}
532
533/// Constructs a file client from a reader.
534pub trait FromReader {
535    /// Error returned by file client type.
536    type Error: From<io::Error>;
537
538    /// Output returned by file client type.
539    type Output;
540
541    /// Returns a file client
542    fn build<R>(
543        &self,
544        reader: R,
545        num_bytes: u64,
546    ) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
547    where
548        Self: Sized,
549        R: AsyncReadExt + Unpin;
550}
551
552/// Output from decoding a file chunk with [`FromReader::build`].
553#[derive(Debug)]
554pub struct DecodedFileChunk<T> {
555    /// File client, i.e. the decoded part of chunk.
556    pub file_client: T,
557    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
558    pub remaining_bytes: Vec<u8>,
559    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
560    /// block number, like receipts.
561    pub highest_block: Option<u64>,
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use crate::{
568        bodies::{
569            bodies::BodiesDownloaderBuilder,
570            test_utils::{insert_headers, zip_blocks},
571        },
572        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
573        test_utils::{generate_bodies, generate_bodies_file},
574    };
575    use assert_matches::assert_matches;
576    use futures_util::stream::StreamExt;
577    use rand::Rng;
578    use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
579    use reth_network_p2p::{
580        bodies::downloader::BodyDownloader,
581        headers::downloader::{HeaderDownloader, SyncTarget},
582    };
583    use reth_primitives::Block;
584    use reth_provider::test_utils::create_test_provider_factory;
585    use std::sync::Arc;
586
587    #[tokio::test]
588    async fn streams_bodies_from_buffer() {
589        // Generate some random blocks
590        let factory = create_test_provider_factory();
591        let (headers, mut bodies) = generate_bodies(0..=19);
592
593        insert_headers(factory.db_ref().db(), &headers);
594
595        // create an empty file
596        let file = tempfile::tempfile().unwrap();
597
598        let client: Arc<FileClient> = Arc::new(
599            FileClient::from_file(file.into(), NoopConsensus::arc())
600                .await
601                .unwrap()
602                .with_bodies(bodies.clone()),
603        );
604        let mut downloader = BodiesDownloaderBuilder::default()
605            .build::<reth_primitives::Block, _, _>(
606                client.clone(),
607                Arc::new(TestConsensus::default()),
608                factory,
609            );
610        downloader.set_download_range(0..=19).expect("failed to set download range");
611
612        assert_matches!(
613            downloader.next().await,
614            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
615        );
616    }
617
618    #[tokio::test]
619    async fn download_headers_at_fork_head() {
620        reth_tracing::init_test_tracing();
621
622        let p3 = SealedHeader::default();
623        let p2 = child_header(&p3);
624        let p1 = child_header(&p2);
625        let p0 = child_header(&p1);
626
627        let file = tempfile::tempfile().unwrap();
628        let client: Arc<FileClient> = Arc::new(
629            FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
630                HashMap::from([
631                    (0u64, p0.clone_header()),
632                    (1, p1.clone_header()),
633                    (2, p2.clone_header()),
634                    (3, p3.clone_header()),
635                ]),
636            ),
637        );
638
639        let mut downloader = ReverseHeadersDownloaderBuilder::default()
640            .stream_batch_size(3)
641            .request_limit(3)
642            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
643        downloader.update_local_head(p3.clone());
644        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
645
646        let headers = downloader.next().await.unwrap();
647        assert_eq!(headers, Ok(vec![p0, p1, p2]));
648        assert!(downloader.next().await.is_none());
649        assert!(downloader.next().await.is_none());
650    }
651
652    #[tokio::test]
653    async fn test_download_headers_from_file() {
654        reth_tracing::init_test_tracing();
655
656        // Generate some random blocks
657        let (file, headers, _) = generate_bodies_file(0..=19).await;
658        // now try to read them back
659        let client: Arc<FileClient> =
660            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
661
662        // construct headers downloader and use first header
663        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
664            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
665        header_downloader.update_local_head(headers.first().unwrap().clone());
666        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
667
668        // get headers first
669        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
670
671        // reverse to make sure it's in the right order before comparing
672        downloaded_headers.reverse();
673
674        // the first header is not included in the response
675        assert_eq!(downloaded_headers, headers[1..]);
676    }
677
678    #[tokio::test]
679    async fn test_download_bodies_from_file() {
680        // Generate some random blocks
681        let factory = create_test_provider_factory();
682        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
683
684        // now try to read them back
685        let client: Arc<FileClient> =
686            Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
687
688        // insert headers in db for the bodies downloader
689        insert_headers(factory.db_ref().db(), &headers);
690
691        let mut downloader = BodiesDownloaderBuilder::default()
692            .build::<reth_primitives::Block, _, _>(
693                client.clone(),
694                Arc::new(TestConsensus::default()),
695                factory,
696            );
697        downloader.set_download_range(0..=19).expect("failed to set download range");
698
699        assert_matches!(
700            downloader.next().await,
701            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
702        );
703    }
704
705    #[tokio::test]
706    async fn test_chunk_download_headers_from_file() {
707        reth_tracing::init_test_tracing();
708
709        // Generate some random blocks
710        let (file, headers, _) = generate_bodies_file(0..=14).await;
711
712        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
713        // one block will be read
714        let chunk_byte_len = rand::thread_rng().gen_range(2000..=10_000);
715        trace!(target: "downloaders::file::test", chunk_byte_len);
716
717        // init reader
718        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
719
720        let mut downloaded_headers: Vec<SealedHeader> = vec![];
721
722        let mut local_header = headers.first().unwrap().clone();
723
724        // test
725        while let Some(client) =
726            reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
727        {
728            let sync_target = client.tip_header().unwrap();
729
730            let sync_target_hash = sync_target.hash();
731
732            // construct headers downloader and use first header
733            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
734                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
735            header_downloader.update_local_head(local_header.clone());
736            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
737
738            // get headers first
739            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
740
741            // export new local header to outer scope
742            local_header = sync_target;
743
744            // reverse to make sure it's in the right order before comparing
745            downloaded_headers_chunk.reverse();
746            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
747        }
748
749        // the first header is not included in the response
750        assert_eq!(headers[1..], downloaded_headers);
751    }
752}